mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-29 19:12:03 +00:00
881ac7208e
Add no-distributed-cache tag in tests
3550 lines
123 KiB
Python
Executable File
3550 lines
123 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# pylint: disable=too-many-return-statements
|
|
# pylint: disable=global-variable-not-assigned
|
|
# pylint: disable=too-many-lines
|
|
# pylint: disable=anomalous-backslash-in-string
|
|
# pylint: disable=protected-access
|
|
|
|
import copy
|
|
import enum
|
|
import tempfile
|
|
import glob
|
|
|
|
# Not requests, to avoid requiring extra dependency.
|
|
import http.client
|
|
import itertools
|
|
import json
|
|
import math
|
|
import multiprocessing
|
|
import os
|
|
import os.path
|
|
import platform
|
|
import random
|
|
import re
|
|
import shutil
|
|
import signal
|
|
import socket
|
|
import string
|
|
import subprocess
|
|
import sys
|
|
import traceback
|
|
import urllib.parse
|
|
|
|
# for crc32
|
|
import zlib
|
|
from argparse import ArgumentParser
|
|
from datetime import datetime, timedelta
|
|
from errno import ESRCH
|
|
from subprocess import PIPE, Popen
|
|
from time import sleep, time
|
|
from typing import Dict, List, Optional, Set, Tuple, Union
|
|
|
|
try:
|
|
import termcolor # type: ignore
|
|
except ImportError:
|
|
termcolor = None
|
|
|
|
|
|
USE_JINJA = True
|
|
try:
|
|
import jinja2
|
|
except ImportError:
|
|
USE_JINJA = False
|
|
print("WARNING: jinja2 not installed! Template tests will be skipped.")
|
|
|
|
MESSAGES_TO_RETRY = [
|
|
"ConnectionPoolWithFailover: Connection failed at try",
|
|
"DB::Exception: New table appeared in database being dropped or detached. Try again",
|
|
"is already started to be removing by another replica right now",
|
|
# This is from LSan, and it indicates its own internal problem:
|
|
"Unable to get registers from thread",
|
|
]
|
|
|
|
MAX_RETRIES = 3
|
|
|
|
TEST_FILE_EXTENSIONS = [".sql", ".sql.j2", ".sh", ".py", ".expect"]
|
|
|
|
VERSION_PATTERN = r"^((\d+\.)?(\d+\.)?(\d+\.)?\d+)$"
|
|
|
|
TEST_MAX_RUN_TIME_IN_SECONDS = 180
|
|
|
|
|
|
class SharedEngineReplacer:
|
|
ENGINES_NON_REPLICATED_REGEXP = r"[ =]((Collapsing|VersionedCollapsing|Summing|Replacing|Aggregating|)MergeTree\(?\)?)"
|
|
ENGINES_MAPPING_REPLICATED = [
|
|
("ReplicatedMergeTree", "SharedMergeTree"),
|
|
("ReplicatedCollapsingMergeTree", "SharedCollapsingMergeTree"),
|
|
(
|
|
"ReplicatedVersionedCollapsingMergeTree",
|
|
"SharedVersionedCollapsingMergeTree",
|
|
),
|
|
("ReplicatedSummingMergeTree", "SharedSummingMergeTree"),
|
|
("ReplicatedReplacingMergeTree", "SharedReplacingMergeTree"),
|
|
("ReplicatedAggregatingMergeTree", "SharedAggregatingMergeTree"),
|
|
]
|
|
NEW_SYNTAX_REPLICATED_MERGE_TREE_RE = (
|
|
r"Replicated[a-zA-Z]*MergeTree\((\\?'.*\\?')?,?(\\?'.*\\?')?[a-zA-Z, _}{]*\)"
|
|
)
|
|
OLD_SYNTAX_OR_ARGUMENTS_RE = r"Tree\(.*[0-9]+.*\)"
|
|
|
|
def _check_replicad_new_syntax(self, line):
|
|
return re.search(self.NEW_SYNTAX_REPLICATED_MERGE_TREE_RE, line) is not None
|
|
|
|
def _check_old_syntax_or_arguments(self, line):
|
|
return re.search(self.OLD_SYNTAX_OR_ARGUMENTS_RE, line) is not None
|
|
|
|
@staticmethod
|
|
def _is_comment_line(line):
|
|
return line.startswith("SELECT") or line.startswith("select")
|
|
|
|
@staticmethod
|
|
def _is_create_query(line):
|
|
return (
|
|
line.startswith("CREATE")
|
|
or line.startswith("create")
|
|
or line.startswith("ENGINE")
|
|
or line.startswith("engine")
|
|
)
|
|
|
|
def _replace_non_replicated(self, line, escape_quotes, use_random_path):
|
|
groups = re.search(self.ENGINES_NON_REPLICATED_REGEXP, line)
|
|
if groups is not None and not self._check_old_syntax_or_arguments(line):
|
|
non_replicated_engine = groups.groups()[0]
|
|
basename_no_ext = os.path.splitext(os.path.basename(self.file_name))[0]
|
|
if use_random_path:
|
|
shared_path = "/" + os.path.join(
|
|
basename_no_ext.replace("_", "/"),
|
|
str(os.getpid()),
|
|
str(random.randint(1, 1000)),
|
|
)
|
|
else:
|
|
shared_path = "/" + os.path.join(
|
|
basename_no_ext.replace("_", "/"), str(os.getpid())
|
|
)
|
|
|
|
if escape_quotes:
|
|
shared_engine = (
|
|
"Shared"
|
|
+ non_replicated_engine.replace("()", "")
|
|
+ f"(\\'{shared_path}\\', \\'1\\')"
|
|
)
|
|
else:
|
|
shared_engine = (
|
|
"Shared"
|
|
+ non_replicated_engine.replace("()", "")
|
|
+ f"('{shared_path}', '1')"
|
|
)
|
|
return line.replace(non_replicated_engine, shared_engine)
|
|
|
|
return line
|
|
|
|
def _need_to_replace_something(self):
|
|
return (
|
|
self.replace_replicated or self.replace_non_replicated
|
|
) and "shared_merge_tree" not in self.file_name
|
|
|
|
def _has_show_create_table(self):
|
|
with open(self.file_name, "r", encoding="utf-8") as f:
|
|
return re.search("show create table", f.read(), re.IGNORECASE)
|
|
|
|
def __init__(
|
|
self, file_name, replace_replicated, replace_non_replicated, reference_file
|
|
):
|
|
self.file_name = file_name
|
|
self.temp_file_path = get_temp_file_path()
|
|
self.replace_replicated = replace_replicated
|
|
self.replace_non_replicated = replace_non_replicated
|
|
|
|
use_random_path = not reference_file and not self._has_show_create_table()
|
|
|
|
if not self._need_to_replace_something():
|
|
return
|
|
|
|
shutil.copyfile(self.file_name, self.temp_file_path)
|
|
shutil.copymode(self.file_name, self.temp_file_path)
|
|
|
|
with open(self.file_name, "w", newline="", encoding="utf-8") as modified:
|
|
with open(self.temp_file_path, "r", newline="", encoding="utf-8") as source:
|
|
for line in source:
|
|
if self._is_comment_line(line) or (
|
|
reference_file and not self._is_create_query(line)
|
|
):
|
|
modified.write(line)
|
|
continue
|
|
|
|
if self.replace_replicated:
|
|
for (
|
|
engine_from,
|
|
engine_to,
|
|
) in SharedEngineReplacer.ENGINES_MAPPING_REPLICATED:
|
|
if engine_from in line and (
|
|
self._check_replicad_new_syntax(line)
|
|
or engine_from + " " in line
|
|
or engine_from + ";" in line
|
|
):
|
|
line = line.replace(engine_from, engine_to)
|
|
break
|
|
|
|
if self.replace_non_replicated:
|
|
line = self._replace_non_replicated(
|
|
line, reference_file, use_random_path
|
|
)
|
|
|
|
modified.write(line)
|
|
|
|
def __enter__(self):
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, exc_tb):
|
|
if not self._need_to_replace_something():
|
|
return
|
|
shutil.move(self.temp_file_path, self.file_name)
|
|
|
|
|
|
def get_temp_file_path():
|
|
return os.path.join(
|
|
tempfile._get_default_tempdir(), next(tempfile._get_candidate_names())
|
|
)
|
|
|
|
|
|
def stringhash(s: str) -> int:
|
|
# default hash() function consistent
|
|
# only during process invocation https://stackoverflow.com/a/42089311
|
|
return zlib.crc32(s.encode("utf-8"))
|
|
|
|
|
|
def read_file_as_binary_string(file_path):
|
|
with open(file_path, "rb") as file:
|
|
binary_data = file.read()
|
|
return binary_data
|
|
|
|
|
|
# First and last lines of the log
|
|
def trim_for_log(s):
|
|
if not s:
|
|
return s
|
|
lines = s.splitlines()
|
|
if len(lines) > 10000:
|
|
separator = "-" * 40 + str(len(lines) - 10000) + " lines are hidden" + "-" * 40
|
|
return "\n".join(lines[:5000] + [] + [separator] + [] + lines[-5000:])
|
|
else:
|
|
return "\n".join(lines)
|
|
|
|
|
|
def is_valid_utf_8(fname):
|
|
try:
|
|
with open(fname, "rb") as f:
|
|
contents = f.read()
|
|
contents.decode("utf-8")
|
|
return True
|
|
except UnicodeDecodeError:
|
|
return False
|
|
|
|
|
|
class TestException(Exception):
|
|
pass
|
|
|
|
|
|
class HTTPError(Exception):
|
|
def __init__(self, message=None, code=None):
|
|
self.message = message
|
|
self.code = code
|
|
super().__init__(message)
|
|
|
|
def __str__(self):
|
|
return f"Code: {self.code}. {self.message}"
|
|
|
|
|
|
# Helpers to execute queries via HTTP interface.
|
|
def clickhouse_execute_http(
|
|
base_args,
|
|
query,
|
|
body=None,
|
|
timeout=30,
|
|
settings=None,
|
|
default_format=None,
|
|
max_http_retries=5,
|
|
retry_error_codes=False,
|
|
):
|
|
if args.secure:
|
|
client = http.client.HTTPSConnection(
|
|
host=base_args.tcp_host, port=base_args.http_port, timeout=timeout
|
|
)
|
|
else:
|
|
client = http.client.HTTPConnection(
|
|
host=base_args.tcp_host, port=base_args.http_port, timeout=timeout
|
|
)
|
|
|
|
timeout = int(timeout)
|
|
params = {
|
|
"query": query,
|
|
# hung check in stress tests may remove the database,
|
|
# hence we should use 'system'.
|
|
"database": "system",
|
|
"connect_timeout": timeout,
|
|
"receive_timeout": timeout,
|
|
"send_timeout": timeout,
|
|
"http_connection_timeout": timeout,
|
|
"http_receive_timeout": timeout,
|
|
"http_send_timeout": timeout,
|
|
"output_format_parallel_formatting": 0,
|
|
}
|
|
if settings is not None:
|
|
params.update(settings)
|
|
if default_format is not None:
|
|
params["default_format"] = default_format
|
|
|
|
for i in range(max_http_retries):
|
|
try:
|
|
client.request(
|
|
"POST",
|
|
f"/?{base_args.client_options_query_str}{urllib.parse.urlencode(params)}",
|
|
body=body,
|
|
)
|
|
res = client.getresponse()
|
|
data = res.read()
|
|
if res.status == 200 or (not retry_error_codes):
|
|
break
|
|
except Exception as ex:
|
|
if i == max_http_retries - 1:
|
|
raise ex
|
|
client.close()
|
|
sleep(i + 1)
|
|
|
|
if res.status != 200:
|
|
raise HTTPError(data.decode(), res.status)
|
|
|
|
return data
|
|
|
|
|
|
def clickhouse_execute(
|
|
base_args,
|
|
query,
|
|
body=None,
|
|
timeout=30,
|
|
settings=None,
|
|
max_http_retries=5,
|
|
retry_error_codes=False,
|
|
):
|
|
return clickhouse_execute_http(
|
|
base_args,
|
|
query,
|
|
body,
|
|
timeout,
|
|
settings,
|
|
max_http_retries=max_http_retries,
|
|
retry_error_codes=retry_error_codes,
|
|
).strip()
|
|
|
|
|
|
def clickhouse_execute_json(
|
|
base_args, query, timeout=60, settings=None, max_http_retries=5
|
|
):
|
|
data = clickhouse_execute_http(
|
|
base_args,
|
|
query,
|
|
None,
|
|
timeout,
|
|
settings,
|
|
"JSONEachRow",
|
|
max_http_retries=max_http_retries,
|
|
)
|
|
if not data:
|
|
return None
|
|
rows = []
|
|
for row in data.strip().splitlines():
|
|
rows.append(json.loads(row))
|
|
return rows
|
|
|
|
|
|
def stop_tests():
|
|
# send signal to all processes in group to avoid hung check triggering
|
|
# (to avoid terminating clickhouse-test itself, the signal should be ignored)
|
|
print("Sending signals")
|
|
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
|
os.killpg(os.getpgid(os.getpid()), signal.SIGTERM)
|
|
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
|
print("Sending signals DONE")
|
|
|
|
|
|
def get_db_engine(args, database_name):
|
|
if args.replicated_database:
|
|
return f" ON CLUSTER test_cluster_database_replicated \
|
|
ENGINE=Replicated('/test/clickhouse/db/{database_name}', \
|
|
'{{shard}}', '{{replica}}')"
|
|
if args.db_engine:
|
|
return " ENGINE=" + args.db_engine
|
|
return "" # Will use default engine
|
|
|
|
|
|
def get_create_database_settings(args, testcase_args):
|
|
create_database_settings = {}
|
|
if testcase_args:
|
|
create_database_settings["log_comment"] = testcase_args.testcase_basename
|
|
if args.db_engine == "Ordinary":
|
|
create_database_settings["allow_deprecated_database_ordinary"] = 1
|
|
return create_database_settings
|
|
|
|
|
|
def get_zookeeper_session_uptime(args):
|
|
try:
|
|
if args.replicated_database:
|
|
return int(
|
|
clickhouse_execute(
|
|
args,
|
|
"""
|
|
SELECT min(materialize(zookeeperSessionUptime()))
|
|
FROM clusterAllReplicas('test_cluster_database_replicated', system.one)
|
|
""",
|
|
)
|
|
)
|
|
else:
|
|
return int(clickhouse_execute(args, "SELECT zookeeperSessionUptime()"))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def need_retry(args, stdout, stderr, total_time):
|
|
if args.check_zookeeper_session:
|
|
# Sometimes we may get unexpected exception like "Replica is readonly" or "Shutdown is called for table"
|
|
# instead of "Session expired" or "Connection loss"
|
|
# Retry if session was expired during test execution.
|
|
# If ZooKeeper is configured, then it's more reliable than checking stderr,
|
|
# but the following condition is always true if ZooKeeper is not configured.
|
|
session_uptime = get_zookeeper_session_uptime(args)
|
|
if session_uptime is not None and session_uptime < math.ceil(total_time):
|
|
return True
|
|
return any(msg in stdout for msg in MESSAGES_TO_RETRY) or any(
|
|
msg in stderr for msg in MESSAGES_TO_RETRY
|
|
)
|
|
|
|
|
|
def get_processlist_size(args):
|
|
if args.replicated_database:
|
|
return int(
|
|
clickhouse_execute(
|
|
args,
|
|
"""
|
|
SELECT
|
|
count()
|
|
FROM clusterAllReplicas('test_cluster_database_replicated', system.processes)
|
|
WHERE query NOT LIKE '%system.processes%'
|
|
""",
|
|
).strip()
|
|
)
|
|
else:
|
|
return int(
|
|
clickhouse_execute(
|
|
args,
|
|
"""
|
|
SELECT
|
|
count()
|
|
FROM system.processes
|
|
WHERE query NOT LIKE '%system.processes%'
|
|
""",
|
|
).strip()
|
|
)
|
|
|
|
|
|
def get_processlist_with_stacktraces(args):
|
|
if args.replicated_database:
|
|
return clickhouse_execute(
|
|
args,
|
|
"""
|
|
SELECT materialize(hostName() || '::' || tcpPort()::String) as host_port, *
|
|
-- NOTE: view() here to do JOIN on shards, instead of initiator
|
|
FROM clusterAllReplicas('test_cluster_database_replicated', view(
|
|
SELECT
|
|
p.*,
|
|
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
|
|
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
|
|
s.trace), '\n') AS stacktrace
|
|
)) AS stacktraces
|
|
FROM system.processes p
|
|
JOIN system.stack_trace s USING (query_id)
|
|
WHERE query NOT LIKE '%system.processes%'
|
|
GROUP BY p.*
|
|
))
|
|
ORDER BY elapsed DESC FORMAT Vertical
|
|
""",
|
|
settings={
|
|
"allow_introspection_functions": 1,
|
|
},
|
|
timeout=120,
|
|
)
|
|
else:
|
|
return clickhouse_execute(
|
|
args,
|
|
"""
|
|
SELECT
|
|
p.*,
|
|
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
|
|
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
|
|
s.trace), '\n') AS stacktrace
|
|
)) AS stacktraces
|
|
FROM system.processes p
|
|
JOIN system.stack_trace s USING (query_id)
|
|
WHERE query NOT LIKE '%system.processes%'
|
|
GROUP BY p.*
|
|
ORDER BY elapsed DESC FORMAT Vertical
|
|
""",
|
|
settings={
|
|
"allow_introspection_functions": 1,
|
|
},
|
|
timeout=120,
|
|
)
|
|
|
|
|
|
def get_transactions_list(args):
|
|
try:
|
|
if args.replicated_database:
|
|
return clickhouse_execute_json(
|
|
args,
|
|
"SELECT materialize((hostName(), tcpPort())) as host, * FROM "
|
|
"clusterAllReplicas('test_cluster_database_replicated', system.transactions)",
|
|
)
|
|
else:
|
|
return clickhouse_execute_json(args, "select * from system.transactions")
|
|
except Exception as e:
|
|
return f"Cannot get list of transactions: {e}"
|
|
|
|
|
|
def kill_gdb_if_any():
|
|
# Check if we have running gdb.
|
|
code = subprocess.call("pidof gdb", shell=True)
|
|
if code != 0:
|
|
return
|
|
|
|
for i in range(5):
|
|
code = subprocess.call("kill -TERM $(pidof gdb)", shell=True, timeout=30)
|
|
if code != 0:
|
|
sleep(i)
|
|
else:
|
|
break
|
|
|
|
|
|
# collect server stacktraces using gdb
|
|
def get_stacktraces_from_gdb(server_pid):
|
|
try:
|
|
# We could attach gdb to clickhouse-server before running some tests
|
|
# to print stacktraces of all crashes even if clickhouse cannot print it for some reason.
|
|
# We should kill existing gdb if any before starting new one.
|
|
kill_gdb_if_any()
|
|
|
|
cmd = f"gdb -batch -ex 'thread apply all backtrace' -p {server_pid}"
|
|
return subprocess.check_output(cmd, shell=True).decode("utf-8")
|
|
except Exception as e:
|
|
print(f"Error occurred while receiving stack traces from gdb: {e}")
|
|
return None
|
|
|
|
|
|
# collect server stacktraces from system.stack_trace table
|
|
def get_stacktraces_from_clickhouse(args):
|
|
settings_str = " ".join(
|
|
[
|
|
get_additional_client_options(args),
|
|
"--allow_introspection_functions=1",
|
|
"--skip_unavailable_shards=1",
|
|
]
|
|
)
|
|
replicated_msg = (
|
|
f"{args.client} {settings_str} --query "
|
|
'"SELECT materialize((hostName(), tcpPort())) as host, thread_name, thread_id, query_id, trace, '
|
|
"arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), "
|
|
"arrayMap(x -> addressToLine(x), trace), "
|
|
"arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace_str "
|
|
"FROM clusterAllReplicas('test_cluster_database_replicated', 'system.stack_trace') "
|
|
'ORDER BY host, thread_id FORMAT Vertical"'
|
|
)
|
|
|
|
msg = (
|
|
f"{args.client} {settings_str} --query "
|
|
"\"SELECT thread_name, thread_id, query_id, trace, arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), "
|
|
"arrayMap(x -> addressToLine(x), trace), "
|
|
"arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace_str "
|
|
'FROM system.stack_trace FORMAT Vertical"'
|
|
)
|
|
|
|
try:
|
|
return subprocess.check_output(
|
|
replicated_msg if args.replicated_database else msg,
|
|
shell=True,
|
|
stderr=subprocess.STDOUT,
|
|
).decode("utf-8")
|
|
except Exception as e:
|
|
print(f"Error occurred while receiving stack traces from client: {e}")
|
|
return None
|
|
|
|
|
|
def print_stacktraces() -> None:
|
|
server_pid = get_server_pid()
|
|
|
|
bt = None
|
|
|
|
if server_pid and not args.replicated_database:
|
|
print("")
|
|
print(
|
|
f"Located ClickHouse server process {server_pid} listening at TCP port {args.tcp_port}"
|
|
)
|
|
print("Collecting stacktraces from all running threads with gdb:")
|
|
|
|
bt = get_stacktraces_from_gdb(server_pid)
|
|
|
|
if len(bt) < 1000:
|
|
print("Got suspiciously small stacktraces: ", bt)
|
|
bt = None
|
|
|
|
if bt is None:
|
|
print("\nCollecting stacktraces from system.stacktraces table:")
|
|
|
|
bt = get_stacktraces_from_clickhouse(args)
|
|
|
|
if bt is not None:
|
|
print(bt)
|
|
return
|
|
|
|
print(
|
|
colored(
|
|
f"\nUnable to locate ClickHouse server process listening at TCP port "
|
|
f"{args.tcp_port}. It must have crashed or exited prematurely!",
|
|
args,
|
|
"red",
|
|
attrs=["bold"],
|
|
)
|
|
)
|
|
|
|
|
|
def get_server_pid():
|
|
# lsof does not work in stress tests for some reason
|
|
cmd_lsof = f"lsof -i tcp:{args.tcp_port} -s tcp:LISTEN -Fp | sed 's/^p//p;d'"
|
|
cmd_pidof = "pidof -s clickhouse-server"
|
|
|
|
commands = [cmd_lsof, cmd_pidof]
|
|
output = None
|
|
|
|
for cmd in commands:
|
|
try:
|
|
output = subprocess.check_output(
|
|
cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True
|
|
)
|
|
if output:
|
|
return int(output)
|
|
except Exception as e:
|
|
print(f"Cannot get server pid with {cmd}, got {output}: {e}")
|
|
|
|
return None # most likely server is dead
|
|
|
|
|
|
def colored(text, args, color=None, on_color=None, attrs=None):
|
|
if termcolor and (sys.stdout.isatty() or args.force_color):
|
|
return termcolor.colored(text, color, on_color, attrs)
|
|
else:
|
|
return text
|
|
|
|
|
|
class TestStatus(enum.Enum):
|
|
FAIL = "FAIL"
|
|
UNKNOWN = "UNKNOWN"
|
|
OK = "OK"
|
|
SKIPPED = "SKIPPED"
|
|
|
|
|
|
class FailureReason(enum.Enum):
|
|
# FAIL reasons
|
|
TIMEOUT = "Timeout!"
|
|
SERVER_DIED = "server died"
|
|
EXIT_CODE = "return code: "
|
|
STDERR = "having stderror: "
|
|
EXCEPTION = "having exception in stdout: "
|
|
RESULT_DIFF = "result differs with reference: "
|
|
TOO_LONG = (
|
|
f"Test runs too long (> {TEST_MAX_RUN_TIME_IN_SECONDS}s). Make it faster."
|
|
)
|
|
INTERNAL_QUERY_FAIL = "Internal query (CREATE/DROP DATABASE) failed:"
|
|
|
|
# SKIPPED reasons
|
|
NOT_SUPPORTED_IN_CLOUD = "not supported in cloud environment"
|
|
NOT_SUPPORTED_IN_PRIVATE = "not supported in private build"
|
|
DISABLED = "disabled"
|
|
SKIP = "skip"
|
|
NO_JINJA = "no jinja"
|
|
NO_ZOOKEEPER = "no zookeeper"
|
|
NO_SHARD = "no shard"
|
|
FAST_ONLY = "running fast tests only"
|
|
NO_LONG = "not running long tests"
|
|
REPLICATED_DB = "replicated-database"
|
|
NON_ATOMIC_DB = "database engine not Atomic"
|
|
OBJECT_STORAGE = "object-storage"
|
|
S3_STORAGE = "s3-storage"
|
|
AZURE_BLOB_STORAGE = "azure-blob-storage"
|
|
BUILD = "not running for current build"
|
|
NO_PARALLEL_REPLICAS = "smth in not supported with parallel replicas"
|
|
SHARED_MERGE_TREE = "no-shared-merge-tree"
|
|
DISTRIBUTED_CACHE = "distributed-cache"
|
|
|
|
# UNKNOWN reasons
|
|
NO_REFERENCE = "no reference file"
|
|
INTERNAL_ERROR = "Test internal error: "
|
|
|
|
|
|
def threshold_generator(always_on_prob, always_off_prob, min_val, max_val):
|
|
def gen():
|
|
tmp = random.random()
|
|
if tmp <= always_on_prob:
|
|
return min_val
|
|
if tmp <= always_on_prob + always_off_prob:
|
|
return max_val
|
|
|
|
if isinstance(min_val, int) and isinstance(max_val, int):
|
|
return random.randint(min_val, max_val)
|
|
else:
|
|
return random.uniform(min_val, max_val)
|
|
|
|
return gen
|
|
|
|
|
|
# To keep dependency list as short as possible, tzdata is not used here (to
|
|
# avoid try/except block for import)
|
|
def get_localzone():
|
|
return os.getenv("TZ", "/".join(os.readlink("/etc/localtime").split("/")[-2:]))
|
|
|
|
|
|
class SettingsRandomizer:
|
|
settings = {
|
|
"max_insert_threads": lambda: (
|
|
12 if random.random() < 0.03 else random.randint(1, 3)
|
|
),
|
|
"group_by_two_level_threshold": threshold_generator(0.2, 0.2, 1, 1000000),
|
|
"group_by_two_level_threshold_bytes": threshold_generator(
|
|
0.2, 0.2, 1, 50000000
|
|
),
|
|
"distributed_aggregation_memory_efficient": lambda: random.randint(0, 1),
|
|
"fsync_metadata": lambda: random.randint(0, 1),
|
|
"output_format_parallel_formatting": lambda: random.randint(0, 1),
|
|
"input_format_parallel_parsing": lambda: random.randint(0, 1),
|
|
"min_chunk_bytes_for_parallel_parsing": lambda: max(
|
|
1024, int(random.gauss(10 * 1024 * 1024, 5 * 1000 * 1000))
|
|
),
|
|
"max_read_buffer_size": lambda: random.randint(500000, 1048576),
|
|
"prefer_localhost_replica": lambda: random.randint(0, 1),
|
|
"max_block_size": lambda: random.randint(8000, 100000),
|
|
"max_joined_block_size_rows": lambda: random.randint(8000, 100000),
|
|
"max_threads": lambda: 32 if random.random() < 0.03 else random.randint(1, 3),
|
|
"optimize_append_index": lambda: random.randint(0, 1),
|
|
"optimize_if_chain_to_multiif": lambda: random.randint(0, 1),
|
|
"optimize_if_transform_strings_to_enum": lambda: random.randint(0, 1),
|
|
"optimize_read_in_order": lambda: random.randint(0, 1),
|
|
"optimize_or_like_chain": lambda: random.randint(0, 1),
|
|
"optimize_substitute_columns": lambda: random.randint(0, 1),
|
|
"enable_multiple_prewhere_read_steps": lambda: random.randint(0, 1),
|
|
"read_in_order_two_level_merge_threshold": lambda: random.randint(0, 100),
|
|
"optimize_aggregation_in_order": lambda: random.randint(0, 1),
|
|
"aggregation_in_order_max_block_bytes": lambda: random.randint(0, 50000000),
|
|
"use_uncompressed_cache": lambda: random.randint(0, 1),
|
|
"min_bytes_to_use_direct_io": threshold_generator(
|
|
0.2, 0.5, 1, 10 * 1024 * 1024 * 1024
|
|
),
|
|
"min_bytes_to_use_mmap_io": threshold_generator(
|
|
0.2, 0.5, 1, 10 * 1024 * 1024 * 1024
|
|
),
|
|
"local_filesystem_read_method": lambda: random.choice(
|
|
# Allow to use uring only when running on Linux
|
|
["read", "pread", "mmap", "pread_threadpool", "io_uring"]
|
|
if platform.system().lower() == "linux"
|
|
else ["read", "pread", "mmap", "pread_threadpool"]
|
|
),
|
|
"remote_filesystem_read_method": lambda: random.choice(["read", "threadpool"]),
|
|
"local_filesystem_read_prefetch": lambda: random.randint(0, 1),
|
|
"filesystem_cache_segments_batch_size": lambda: random.choice([0, 3, 10, 50]),
|
|
"read_from_filesystem_cache_if_exists_otherwise_bypass_cache": lambda: random.randint(
|
|
0, 1
|
|
),
|
|
"throw_on_error_from_cache_on_write_operations": lambda: random.randint(0, 1),
|
|
"remote_filesystem_read_prefetch": lambda: random.randint(0, 1),
|
|
"allow_prefetched_read_pool_for_remote_filesystem": lambda: random.randint(
|
|
0, 1
|
|
),
|
|
"filesystem_prefetch_max_memory_usage": lambda: random.choice(
|
|
["32Mi", "64Mi", "128Mi"]
|
|
),
|
|
"filesystem_prefetches_limit": lambda: random.choice(
|
|
[0, 10]
|
|
), # 0 means unlimited (but anyway limited by prefetch_max_memory_usage)
|
|
"filesystem_prefetch_min_bytes_for_single_read_task": lambda: random.choice(
|
|
["1Mi", "8Mi", "16Mi"]
|
|
),
|
|
"filesystem_prefetch_step_marks": lambda: random.choice(
|
|
[0, 50]
|
|
), # 0 means 'auto'
|
|
"filesystem_prefetch_step_bytes": lambda: random.choice(
|
|
[0, "100Mi"]
|
|
), # 0 means 'auto'
|
|
# "compile_expressions": lambda: random.randint(0, 1), - this setting has a bug: https://github.com/ClickHouse/ClickHouse/issues/51264
|
|
"compile_aggregate_expressions": lambda: random.randint(0, 1),
|
|
"compile_sort_description": lambda: random.randint(0, 1),
|
|
"merge_tree_coarse_index_granularity": lambda: random.randint(2, 32),
|
|
"optimize_distinct_in_order": lambda: random.randint(0, 1),
|
|
"max_bytes_before_external_sort": threshold_generator(
|
|
0.3, 0.5, 0, 10 * 1024 * 1024 * 1024
|
|
),
|
|
"max_bytes_before_external_group_by": threshold_generator(
|
|
0.3, 0.5, 0, 10 * 1024 * 1024 * 1024
|
|
),
|
|
"max_bytes_before_remerge_sort": lambda: random.randint(1, 3000000000),
|
|
"min_compress_block_size": lambda: random.randint(1, 1048576 * 3),
|
|
"max_compress_block_size": lambda: random.randint(1, 1048576 * 3),
|
|
"merge_tree_compact_parts_min_granules_to_multibuffer_read": lambda: random.randint(
|
|
1, 128
|
|
),
|
|
"optimize_sorting_by_input_stream_properties": lambda: random.randint(0, 1),
|
|
"http_response_buffer_size": lambda: random.randint(0, 10 * 1048576),
|
|
"http_wait_end_of_query": lambda: random.random() > 0.5,
|
|
"enable_memory_bound_merging_of_aggregation_results": lambda: random.randint(
|
|
0, 1
|
|
),
|
|
"min_count_to_compile_expression": lambda: random.choice([0, 3]),
|
|
"min_count_to_compile_aggregate_expression": lambda: random.choice([0, 3]),
|
|
"min_count_to_compile_sort_description": lambda: random.choice([0, 3]),
|
|
"session_timezone": lambda: random.choice(
|
|
[
|
|
# special non-deterministic around 1970 timezone, see [1].
|
|
#
|
|
# [1]: https://github.com/ClickHouse/ClickHouse/issues/42653
|
|
"America/Mazatlan",
|
|
"America/Hermosillo",
|
|
"Mexico/BajaSur",
|
|
# These timezones had DST transitions on some unusual dates (e.g. 2000-01-15 12:00:00).
|
|
"Africa/Khartoum",
|
|
"Africa/Juba",
|
|
# server default that is randomized across all timezones
|
|
# NOTE: due to lots of trickery we cannot use empty timezone here, but this should be the same.
|
|
get_localzone(),
|
|
]
|
|
),
|
|
# This setting affect part names and their content which can be read from tables in tests.
|
|
# We have a lot of tests which relies on part names, so it's very unsafe to enable randomization
|
|
# of this setting
|
|
# "prefer_warmed_unmerged_parts_seconds": lambda: random.randint(0, 10),
|
|
"use_page_cache_for_disks_without_file_cache": lambda: random.random() < 0.7,
|
|
"page_cache_inject_eviction": lambda: random.random() < 0.5,
|
|
"merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability": lambda: round(
|
|
random.random(), 2
|
|
),
|
|
"prefer_external_sort_block_bytes": lambda: random.choice([0, 1, 100000000]),
|
|
"cross_join_min_rows_to_compress": lambda: random.choice([0, 1, 100000000]),
|
|
"cross_join_min_bytes_to_compress": lambda: random.choice([0, 1, 100000000]),
|
|
"min_external_table_block_size_bytes": lambda: random.choice([0, 1, 100000000]),
|
|
"max_parsing_threads": lambda: random.choice([0, 1, 10]),
|
|
"optimize_functions_to_subcolumns": lambda: random.randint(0, 1),
|
|
}
|
|
|
|
@staticmethod
|
|
def get_random_settings(args):
|
|
random_settings = {}
|
|
is_debug = BuildFlags.DEBUG in args.build_flags
|
|
for setting, generator in SettingsRandomizer.settings.items():
|
|
if (
|
|
is_debug
|
|
and setting == "allow_prefetched_read_pool_for_remote_filesystem"
|
|
):
|
|
random_settings[setting] = 0
|
|
else:
|
|
random_settings[setting] = generator()
|
|
return random_settings
|
|
|
|
|
|
class MergeTreeSettingsRandomizer:
|
|
settings = {
|
|
"ratio_of_defaults_for_sparse_serialization": threshold_generator(
|
|
0.3, 0.5, 0.0, 1.0
|
|
),
|
|
"prefer_fetch_merged_part_size_threshold": threshold_generator(
|
|
0.2, 0.5, 1, 10 * 1024 * 1024 * 1024
|
|
),
|
|
"vertical_merge_algorithm_min_rows_to_activate": threshold_generator(
|
|
0.4, 0.4, 1, 1000000
|
|
),
|
|
"vertical_merge_algorithm_min_columns_to_activate": threshold_generator(
|
|
0.4, 0.4, 1, 100
|
|
),
|
|
"allow_vertical_merges_from_compact_to_wide_parts": lambda: random.randint(
|
|
0, 1
|
|
),
|
|
"min_merge_bytes_to_use_direct_io": threshold_generator(
|
|
0.25, 0.25, 1, 10 * 1024 * 1024 * 1024
|
|
),
|
|
"index_granularity_bytes": lambda: random.randint(1024, 30 * 1024 * 1024),
|
|
"merge_max_block_size": lambda: random.randint(1, 8192 * 3),
|
|
"index_granularity": lambda: random.randint(1, 65536),
|
|
"min_bytes_for_wide_part": threshold_generator(0.3, 0.3, 0, 1024 * 1024 * 1024),
|
|
"compress_marks": lambda: random.randint(0, 1),
|
|
"compress_primary_key": lambda: random.randint(0, 1),
|
|
"marks_compress_block_size": lambda: random.randint(8000, 100000),
|
|
"primary_key_compress_block_size": lambda: random.randint(8000, 100000),
|
|
"replace_long_file_name_to_hash": lambda: random.randint(0, 1),
|
|
"max_file_name_length": threshold_generator(0.3, 0.3, 0, 128),
|
|
"min_bytes_for_full_part_storage": threshold_generator(
|
|
0.3, 0.3, 0, 512 * 1024 * 1024
|
|
),
|
|
"compact_parts_max_bytes_to_buffer": lambda: random.randint(
|
|
1024, 512 * 1024 * 1024
|
|
),
|
|
"compact_parts_max_granules_to_buffer": threshold_generator(0.15, 0.15, 1, 256),
|
|
"compact_parts_merge_max_bytes_to_prefetch_part": lambda: random.randint(
|
|
1, 32 * 1024 * 1024
|
|
),
|
|
"cache_populated_by_fetch": lambda: random.randint(0, 1),
|
|
"concurrent_part_removal_threshold": threshold_generator(0.2, 0.3, 0, 100),
|
|
"old_parts_lifetime": threshold_generator(0.2, 0.3, 10, 8 * 60),
|
|
}
|
|
|
|
@staticmethod
|
|
def get_random_settings(args):
|
|
random_settings = {}
|
|
for setting, generator in MergeTreeSettingsRandomizer.settings.items():
|
|
if setting not in args.changed_merge_tree_settings:
|
|
random_settings[setting] = generator()
|
|
return random_settings
|
|
|
|
|
|
def replace_in_file(filename, what, with_what):
|
|
os.system(f"LC_ALL=C sed -i -e 's|{what}|{with_what}|g' {filename}")
|
|
|
|
|
|
class TestResult:
|
|
def __init__(
|
|
self,
|
|
case_name: str,
|
|
status: TestStatus,
|
|
reason: Optional[FailureReason],
|
|
total_time: float,
|
|
description: str,
|
|
):
|
|
self.case_name: str = case_name
|
|
self.status: TestStatus = status
|
|
self.reason: Optional[FailureReason] = reason
|
|
self.total_time: float = total_time
|
|
self.description: str = description
|
|
self.need_retry: bool = False
|
|
|
|
def check_if_need_retry(self, args, stdout, stderr, runs_count):
|
|
if (
|
|
self.status != TestStatus.FAIL
|
|
or not need_retry(args, stdout, stderr, self.total_time)
|
|
or MAX_RETRIES < runs_count
|
|
):
|
|
return
|
|
self.need_retry = True
|
|
|
|
|
|
class TestCase:
|
|
@staticmethod
|
|
def get_description_from_exception_info(exc_info):
|
|
exc_type, exc_value, tb = exc_info
|
|
exc_name = exc_type.__name__
|
|
traceback_str = "\n".join(traceback.format_tb(tb, 10))
|
|
description = f"\n{exc_name}\n{exc_value}\n{traceback_str}"
|
|
return description
|
|
|
|
@staticmethod
|
|
def get_reference_file(suite_dir, name):
|
|
"""
|
|
Returns reference file name for specified test
|
|
"""
|
|
|
|
name = removesuffix(name, ".gen")
|
|
for ext in [".reference", ".gen.reference"]:
|
|
reference_file = os.path.join(suite_dir, name) + ext
|
|
if os.path.isfile(reference_file):
|
|
return reference_file
|
|
return None
|
|
|
|
@staticmethod
|
|
def configure_testcase_args(args, case_file, suite_tmp_dir):
|
|
testcase_args = copy.deepcopy(args)
|
|
|
|
testcase_args.testcase_start_time = datetime.now()
|
|
testcase_basename = os.path.basename(case_file)
|
|
testcase_args.testcase_client = (
|
|
f"{testcase_args.client} --log_comment '{testcase_basename}'"
|
|
)
|
|
testcase_args.testcase_basename = testcase_basename
|
|
|
|
if testcase_args.database:
|
|
database = testcase_args.database
|
|
os.environ.setdefault("CLICKHOUSE_DATABASE", database)
|
|
os.environ.setdefault("CLICKHOUSE_TMP", suite_tmp_dir)
|
|
testcase_args.test_tmp_dir = suite_tmp_dir
|
|
else:
|
|
# If --database is not specified, we will create temporary database with
|
|
# unique name and we will recreate and drop it for each test
|
|
def random_str(length=8):
|
|
alphabet = string.ascii_lowercase + string.digits
|
|
# NOTE: it is important not to use default random generator, since it shares state.
|
|
return "".join(
|
|
random.SystemRandom().choice(alphabet) for _ in range(length)
|
|
)
|
|
|
|
database = f"test_{random_str()}"
|
|
|
|
clickhouse_execute(
|
|
args,
|
|
"CREATE DATABASE IF NOT EXISTS "
|
|
+ database
|
|
+ get_db_engine(testcase_args, database),
|
|
settings=get_create_database_settings(args, testcase_args),
|
|
)
|
|
|
|
os.environ["CLICKHOUSE_DATABASE"] = database
|
|
# Set temporary directory to match the randomly generated database,
|
|
# because .sh tests also use it for temporary files and we want to avoid
|
|
# collisions.
|
|
testcase_args.test_tmp_dir = os.path.join(suite_tmp_dir, database)
|
|
os.mkdir(testcase_args.test_tmp_dir)
|
|
os.environ["CLICKHOUSE_TMP"] = testcase_args.test_tmp_dir
|
|
|
|
testcase_args.testcase_database = database
|
|
|
|
# Printed only in case of failures
|
|
#
|
|
# NOTE: here we use "CLICKHOUSE_TMP" instead of "file_suffix",
|
|
# so it is installed in configure_testcase_args() unlike other files
|
|
# (stdout_file, stderr_file) in TestCase::__init__().
|
|
# Since using CLICKHOUSE_TMP is easier to use in expect.
|
|
testcase_args.debug_log_file = (
|
|
os.path.join(testcase_args.test_tmp_dir, testcase_basename) + ".debuglog"
|
|
)
|
|
|
|
return testcase_args
|
|
|
|
@staticmethod
|
|
def cli_format_settings(settings_list) -> str:
|
|
out = []
|
|
for k, v in settings_list.items():
|
|
out.extend([f"--{k}", str(v)])
|
|
return " ".join(out)
|
|
|
|
@staticmethod
|
|
def http_format_settings(settings_list) -> str:
|
|
return urllib.parse.urlencode(settings_list)
|
|
|
|
def has_show_create_table_in_test(self):
|
|
return not subprocess.call(["grep", "-iq", "show create", self.case_file])
|
|
|
|
def add_random_settings(self, client_options):
|
|
new_options = ""
|
|
if self.randomize_settings:
|
|
http_params = self.http_format_settings(self.random_settings)
|
|
if len(self.base_url_params) == 0:
|
|
os.environ["CLICKHOUSE_URL_PARAMS"] = http_params
|
|
else:
|
|
os.environ["CLICKHOUSE_URL_PARAMS"] = (
|
|
self.base_url_params + "&" + http_params
|
|
)
|
|
|
|
new_options += f" {self.cli_format_settings(self.random_settings)}"
|
|
|
|
if self.randomize_merge_tree_settings:
|
|
new_options += f" --allow_merge_tree_settings {self.cli_format_settings(self.merge_tree_random_settings)}"
|
|
|
|
if new_options != "":
|
|
new_options += " --allow_repeated_settings"
|
|
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"] = (
|
|
self.base_client_options + new_options + " "
|
|
)
|
|
|
|
return client_options + new_options
|
|
|
|
def remove_random_settings_from_env(self):
|
|
os.environ["CLICKHOUSE_URL_PARAMS"] = self.base_url_params
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"] = self.base_client_options
|
|
|
|
def add_info_about_settings(self, description):
|
|
if self.randomize_settings:
|
|
description += f"\nSettings used in the test: {self.cli_format_settings(self.random_settings)}"
|
|
if self.randomize_merge_tree_settings:
|
|
description += f"\n\nMergeTree settings used in test: {self.cli_format_settings(self.merge_tree_random_settings)}"
|
|
|
|
return description + "\n"
|
|
|
|
def __init__(self, suite, case: str, args, is_concurrent: bool):
|
|
self.case: str = case # case file name
|
|
self.tags: Set[str] = suite.all_tags[case] if case in suite.all_tags else set()
|
|
|
|
for tag in os.getenv("GLOBAL_TAGS", "").split(","):
|
|
self.tags.add(tag.strip())
|
|
|
|
self.case_file: str = os.path.join(suite.suite_path, case)
|
|
(self.name, self.ext) = os.path.splitext(case)
|
|
|
|
file_suffix = f".{os.getpid()}" if is_concurrent and args.test_runs > 1 else ""
|
|
self.reference_file = self.get_reference_file(suite.suite_path, self.name)
|
|
self.stdout_file = (
|
|
os.path.join(suite.suite_tmp_path, self.name) + file_suffix + ".stdout"
|
|
)
|
|
self.stderr_file = (
|
|
os.path.join(suite.suite_tmp_path, self.name) + file_suffix + ".stderr"
|
|
)
|
|
|
|
self.testcase_args = None
|
|
self.runs_count = 0
|
|
|
|
has_no_random_settings_tag = self.tags and "no-random-settings" in self.tags
|
|
|
|
self.randomize_settings = not (
|
|
args.no_random_settings or has_no_random_settings_tag
|
|
)
|
|
|
|
has_no_random_merge_tree_settings_tag = (
|
|
self.tags and "no-random-merge-tree-settings" in self.tags
|
|
)
|
|
|
|
# If test contains SHOW CREATE TABLE do not
|
|
# randomize merge tree settings, because
|
|
# they will be added to table definition and test will fail
|
|
self.randomize_merge_tree_settings = not (
|
|
args.no_random_merge_tree_settings
|
|
or has_no_random_settings_tag
|
|
or has_no_random_merge_tree_settings_tag
|
|
or self.has_show_create_table_in_test()
|
|
)
|
|
|
|
if self.randomize_settings:
|
|
self.random_settings = SettingsRandomizer.get_random_settings(args)
|
|
|
|
if self.randomize_merge_tree_settings:
|
|
self.merge_tree_random_settings = (
|
|
MergeTreeSettingsRandomizer.get_random_settings(args)
|
|
)
|
|
|
|
self.base_url_params = (
|
|
os.environ["CLICKHOUSE_URL_PARAMS"]
|
|
if "CLICKHOUSE_URL_PARAMS" in os.environ
|
|
else ""
|
|
)
|
|
|
|
self.base_client_options = (
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"]
|
|
if "CLICKHOUSE_CLIENT_OPT" in os.environ
|
|
else ""
|
|
)
|
|
|
|
self.show_whitespaces_in_diff = args.show_whitespaces_in_diff
|
|
|
|
# should skip test, should increment skipped_total, skip reason
|
|
def should_skip_test(self, suite) -> Optional[FailureReason]:
|
|
tags = self.tags
|
|
|
|
if tags and ("disabled" in tags) and not args.disabled:
|
|
return FailureReason.DISABLED
|
|
|
|
elif args.private and self.name in suite.private_skip_list:
|
|
return FailureReason.NOT_SUPPORTED_IN_PRIVATE
|
|
|
|
elif args.cloud and ("no-replicated-database" in tags):
|
|
return FailureReason.REPLICATED_DB
|
|
|
|
elif args.cloud and self.name in suite.cloud_skip_list:
|
|
return FailureReason.NOT_SUPPORTED_IN_CLOUD
|
|
|
|
elif (
|
|
os.path.exists(os.path.join(suite.suite_path, self.name) + ".disabled")
|
|
and not args.disabled
|
|
):
|
|
return FailureReason.DISABLED
|
|
|
|
elif "no-parallel-replicas" in tags and args.no_parallel_replicas:
|
|
return FailureReason.NO_PARALLEL_REPLICAS
|
|
|
|
elif args.skip and any(s in self.name for s in args.skip):
|
|
return FailureReason.SKIP
|
|
|
|
elif not USE_JINJA and self.ext.endswith("j2"):
|
|
return FailureReason.NO_JINJA
|
|
|
|
elif (
|
|
tags
|
|
and (("zookeeper" in tags) or ("replica" in tags))
|
|
and not args.zookeeper
|
|
):
|
|
return FailureReason.NO_ZOOKEEPER
|
|
|
|
elif (
|
|
tags
|
|
and (("shard" in tags) or ("distributed" in tags) or ("global" in tags))
|
|
and not args.shard
|
|
):
|
|
return FailureReason.NO_SHARD
|
|
|
|
elif tags and ("no-fasttest" in tags) and args.fast_tests_only:
|
|
return FailureReason.FAST_ONLY
|
|
|
|
elif (
|
|
tags
|
|
and (("long" in tags) or ("deadlock" in tags) or ("race" in tags))
|
|
and args.no_long
|
|
):
|
|
# Tests for races and deadlocks usually are run in a loop for a significant amount of time
|
|
return FailureReason.NO_LONG
|
|
|
|
elif tags and ("no-replicated-database" in tags) and args.replicated_database:
|
|
return FailureReason.REPLICATED_DB
|
|
|
|
elif tags and ("no-distributed-cache" in tags) and args.distributed_cache:
|
|
return FailureReason.DISTRIBUTED_CACHE
|
|
|
|
elif (
|
|
tags
|
|
and ("atomic-database" in tags)
|
|
and (args.replicated_database or args.db_engine not in (None, "Atomic"))
|
|
):
|
|
return FailureReason.NON_ATOMIC_DB
|
|
|
|
elif (
|
|
tags
|
|
and ("no-shared-merge-tree" in tags)
|
|
and args.replace_replicated_with_shared
|
|
):
|
|
return FailureReason.SHARED_MERGE_TREE
|
|
|
|
elif tags and ("no-s3-storage" in tags) and args.s3_storage:
|
|
return FailureReason.S3_STORAGE
|
|
elif tags and ("no-azure-blob-storage" in tags) and args.azure_blob_storage:
|
|
return FailureReason.AZURE_BLOB_STORAGE
|
|
elif (
|
|
tags
|
|
and ("no-object-storage" in tags)
|
|
and (args.azure_blob_storage or args.s3_storage)
|
|
):
|
|
return FailureReason.OBJECT_STORAGE
|
|
elif (
|
|
tags
|
|
and "no-object-storage-with-slow-build" in tags
|
|
and (args.s3_storage or args.azure_blob_storage)
|
|
and BuildFlags.RELEASE not in args.build_flags
|
|
):
|
|
return FailureReason.OBJECT_STORAGE
|
|
|
|
elif "no-batch" in tags and (
|
|
args.run_by_hash_num is not None or args.run_by_hash_total is not None
|
|
):
|
|
return FailureReason.SKIP
|
|
|
|
elif tags:
|
|
for build_flag in args.build_flags:
|
|
if "no-" + build_flag in tags:
|
|
return FailureReason.BUILD
|
|
for tag in tags:
|
|
tag = tag.replace("-", "_")
|
|
if tag.startswith("use_") and tag not in args.build_flags:
|
|
return FailureReason.BUILD
|
|
|
|
return None
|
|
|
|
def process_result_impl(
|
|
self, proc, stdout: str, stderr: str, debug_log: str, total_time: float
|
|
):
|
|
description = ""
|
|
|
|
if debug_log:
|
|
debug_log = "\n".join(debug_log.splitlines()[:100])
|
|
|
|
if proc:
|
|
if proc.returncode is None:
|
|
try:
|
|
pgid = os.getpgid(proc.pid)
|
|
# NOTE: this still may leave some processes, that had been
|
|
# created by timeout(1), since it also creates new process
|
|
# group. But this should not be a problem with default
|
|
# options, since the default time for each test is 10min,
|
|
# and this is way more bigger then the timeout for each
|
|
# timeout(1) invocation.
|
|
#
|
|
# But as a workaround we are sending SIGTERM first, and
|
|
# only after SIGKILL, that way timeout(1) will have an
|
|
# ability to terminate childrens (though not always since
|
|
# signals are asynchronous).
|
|
os.killpg(pgid, signal.SIGTERM)
|
|
# This may not be enough, but this is at least something
|
|
# (and anyway it is OK to spend 0.1 second more in case of
|
|
# test timeout).
|
|
sleep(0.1)
|
|
os.killpg(pgid, signal.SIGKILL)
|
|
except OSError as e:
|
|
if e.errno != ESRCH:
|
|
raise
|
|
|
|
if stderr:
|
|
description += stderr
|
|
if debug_log:
|
|
description += "\n"
|
|
description += debug_log
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.TIMEOUT,
|
|
total_time,
|
|
description,
|
|
)
|
|
|
|
if proc.returncode != 0:
|
|
reason = FailureReason.EXIT_CODE
|
|
description += str(proc.returncode)
|
|
|
|
if stderr:
|
|
description += "\n"
|
|
description += stderr
|
|
if debug_log:
|
|
description += "\n"
|
|
description += debug_log
|
|
|
|
# Stop on fatal errors like segmentation fault. They are sent to client via logs.
|
|
if " <Fatal> " in stderr:
|
|
reason = FailureReason.SERVER_DIED
|
|
|
|
if (
|
|
self.testcase_args.stop
|
|
and (
|
|
"Connection refused" in stderr
|
|
or "Attempt to read after eof" in stderr
|
|
)
|
|
and "Received exception from server" not in stderr
|
|
):
|
|
reason = FailureReason.SERVER_DIED
|
|
|
|
if os.path.isfile(self.stdout_file):
|
|
description += ", result:\n\n"
|
|
with open(self.stdout_file, "rb") as f:
|
|
description += trim_for_log(
|
|
f.read().decode("utf-8", errors="ignore")
|
|
)
|
|
description += "\n"
|
|
|
|
description += f"\nstdout:\n{stdout}\n"
|
|
return TestResult(
|
|
self.name, TestStatus.FAIL, reason, total_time, description
|
|
)
|
|
|
|
if stderr:
|
|
description += "\n"
|
|
description += trim_for_log(stderr)
|
|
description += "\n"
|
|
description += "\nstdout:\n"
|
|
description += trim_for_log(stdout)
|
|
description += "\n"
|
|
|
|
if debug_log:
|
|
description += "\n"
|
|
description += debug_log
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.STDERR,
|
|
total_time,
|
|
description,
|
|
)
|
|
|
|
if "Exception" in stdout:
|
|
description += "\n{}\n".format("\n".join(stdout.splitlines()[:100]))
|
|
if debug_log:
|
|
description += "\n"
|
|
description += debug_log
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.EXCEPTION,
|
|
total_time,
|
|
description,
|
|
)
|
|
|
|
if "@@SKIP@@" in stdout:
|
|
skip_reason = stdout.replace("@@SKIP@@", "").rstrip("\n")
|
|
description += " - "
|
|
description += skip_reason
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.SKIPPED,
|
|
FailureReason.SKIP,
|
|
total_time,
|
|
description,
|
|
)
|
|
|
|
if self.reference_file is None:
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.UNKNOWN,
|
|
FailureReason.NO_REFERENCE,
|
|
total_time,
|
|
description,
|
|
)
|
|
|
|
result_is_different = subprocess.call(
|
|
["diff", "-q", self.reference_file, self.stdout_file], stdout=PIPE
|
|
)
|
|
|
|
if result_is_different:
|
|
with Popen(
|
|
[
|
|
"diff",
|
|
"-U",
|
|
str(self.testcase_args.unified),
|
|
self.reference_file,
|
|
self.stdout_file,
|
|
],
|
|
stdout=PIPE,
|
|
universal_newlines=True,
|
|
) as diff_proc:
|
|
if self.show_whitespaces_in_diff:
|
|
with Popen(
|
|
["sed", "-e", "s/[ \t]\\+$/&$/g"],
|
|
stdin=diff_proc.stdout,
|
|
stdout=PIPE,
|
|
) as sed_proc:
|
|
diff = sed_proc.communicate()[0].decode(
|
|
"utf-8", errors="ignore"
|
|
)
|
|
else:
|
|
diff = diff_proc.communicate()[0]
|
|
|
|
if diff.startswith("Binary files "):
|
|
diff += "Content of stdout:\n===================\n"
|
|
with open(self.stdout_file, "rb") as file:
|
|
diff += str(file.read())
|
|
diff += "==================="
|
|
description += f"\n{diff}\n"
|
|
if debug_log:
|
|
description += "\n"
|
|
description += debug_log
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.RESULT_DIFF,
|
|
total_time,
|
|
description,
|
|
)
|
|
|
|
if (
|
|
self.testcase_args.test_runs > 1
|
|
and total_time > TEST_MAX_RUN_TIME_IN_SECONDS
|
|
and "long" not in self.tags
|
|
):
|
|
if debug_log:
|
|
description += "\n"
|
|
description += debug_log
|
|
# We're in Flaky Check mode, check the run time as well while we're at it.
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.TOO_LONG,
|
|
total_time,
|
|
description,
|
|
)
|
|
|
|
if os.path.exists(self.stdout_file):
|
|
os.remove(self.stdout_file)
|
|
if os.path.exists(self.stderr_file):
|
|
os.remove(self.stderr_file)
|
|
if os.path.exists(self.testcase_args.debug_log_file):
|
|
os.remove(self.testcase_args.debug_log_file)
|
|
|
|
return TestResult(self.name, TestStatus.OK, None, total_time, description)
|
|
|
|
@staticmethod
|
|
def print_test_time(test_time) -> str:
|
|
if args.print_time:
|
|
return f" {test_time:.2f} sec."
|
|
else:
|
|
return ""
|
|
|
|
def process_result(self, result: TestResult, messages):
|
|
description_full = messages[result.status]
|
|
description_full += self.print_test_time(result.total_time)
|
|
if result.reason is not None:
|
|
description_full += f"\nReason: {result.reason.value} "
|
|
|
|
description_full += result.description
|
|
|
|
if (
|
|
args.collect_per_test_coverage
|
|
and BuildFlags.SANITIZE_COVERAGE in args.build_flags
|
|
):
|
|
try:
|
|
clickhouse_execute(
|
|
args,
|
|
f"INSERT INTO system.coverage_log SELECT now(), '{self.case}', coverageCurrent()",
|
|
retry_error_codes=True,
|
|
)
|
|
except Exception as e:
|
|
print("Cannot insert coverage data: ", str(e))
|
|
|
|
# Check for dumped coverage files
|
|
file_pattern = "coverage.*"
|
|
matching_files = glob.glob(file_pattern)
|
|
for file_path in matching_files:
|
|
try:
|
|
body = read_file_as_binary_string(file_path)
|
|
clickhouse_execute(
|
|
args,
|
|
"INSERT INTO system.coverage_log "
|
|
"SETTINGS async_insert=1, wait_for_async_insert=0, async_insert_busy_timeout_min_ms=200, async_insert_busy_timeout_max_ms=1000 "
|
|
f"SELECT now(), '{self.case}', groupArray(data) FROM input('data UInt64') FORMAT RowBinary",
|
|
body=body,
|
|
retry_error_codes=True,
|
|
)
|
|
except Exception as e:
|
|
print("Cannot insert coverage data: ", str(e))
|
|
# Remove the file even in case of exception to avoid accumulation and quadratic complexity.
|
|
os.remove(file_path)
|
|
|
|
_ = clickhouse_execute(args, "SYSTEM FLUSH ASYNC INSERT QUEUE")
|
|
|
|
coverage = clickhouse_execute(
|
|
args,
|
|
"SELECT length(coverageCurrent())",
|
|
retry_error_codes=True,
|
|
).decode()
|
|
|
|
description_full += f" (coverage: {coverage})"
|
|
|
|
description_full += "\n"
|
|
|
|
if result.status == TestStatus.FAIL and self.testcase_args:
|
|
description_full += "Database: " + self.testcase_args.testcase_database
|
|
|
|
result.description = description_full
|
|
return result
|
|
|
|
@staticmethod
|
|
def send_test_name_failed(suite: str, case: str):
|
|
pid = os.getpid()
|
|
clickhouse_execute(
|
|
args,
|
|
f"SELECT 'Running test {suite}/{case} from pid={pid}'",
|
|
retry_error_codes=True,
|
|
)
|
|
|
|
def run_single_test(
|
|
self, server_logs_level, client_options
|
|
) -> Tuple[Optional[Popen], str, str, str, float]:
|
|
args = self.testcase_args
|
|
client = args.testcase_client
|
|
start_time = args.testcase_start_time
|
|
database = args.testcase_database
|
|
|
|
if args.client_log:
|
|
log_opt = " --client_logs_file=" + args.client_log + " "
|
|
client_options += log_opt
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"] = (
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"]
|
|
if "CLICKHOUSE_CLIENT_OPT" in os.environ
|
|
else ""
|
|
) + log_opt
|
|
|
|
# This is for .sh tests
|
|
os.environ["CLICKHOUSE_LOG_COMMENT"] = args.testcase_basename
|
|
|
|
query_params = ""
|
|
if "need-query-parameters" in self.tags:
|
|
query_params = (
|
|
" --param_CLICKHOUSE_DATABASE="
|
|
+ database
|
|
+ " --param_CLICKHOUSE_DATABASE_1="
|
|
+ database
|
|
+ "_1"
|
|
)
|
|
|
|
params = {
|
|
"client": client + " --database=" + database + query_params,
|
|
"logs_level": server_logs_level,
|
|
"options": client_options,
|
|
"test": self.case_file,
|
|
"stdout": self.stdout_file,
|
|
"stderr": self.stderr_file,
|
|
"secure": "--secure" if args.secure else "",
|
|
}
|
|
|
|
# >> append to stderr (but not stdout since it is not used there),
|
|
# because there are also output of per test database creation
|
|
pattern = "{test} > {stdout} 2> {stderr}"
|
|
|
|
if self.ext == ".sql" and args.cloud:
|
|
# Get at least some logs, because we don't have access to system.text_log and pods...
|
|
pattern = (
|
|
"{client} --send_logs_level={logs_level} {secure} --multiquery {options}"
|
|
" --send_logs_level=trace < {test} > {stdout} 2>> /test_output/some_logs_from_server.log"
|
|
)
|
|
elif self.ext == ".sql" and not args.cloud:
|
|
pattern = (
|
|
"{client} --send_logs_level={logs_level} {secure} --multiquery {options} < "
|
|
+ pattern
|
|
)
|
|
|
|
# We want to calculate per-test code coverage. That's why we reset it before each test.
|
|
if (
|
|
args.collect_per_test_coverage
|
|
and args.reset_coverage_before_every_test
|
|
and BuildFlags.SANITIZE_COVERAGE in args.build_flags
|
|
):
|
|
clickhouse_execute(
|
|
args,
|
|
"SYSTEM RESET COVERAGE",
|
|
retry_error_codes=True,
|
|
)
|
|
|
|
command = pattern.format(**params)
|
|
|
|
# pylint:disable-next=consider-using-with; TODO: fix
|
|
proc = Popen(command, shell=True, env=os.environ, start_new_session=True)
|
|
|
|
try:
|
|
proc.wait(args.timeout)
|
|
except subprocess.TimeoutExpired:
|
|
# Whether the test timed out will be decided later
|
|
pass
|
|
|
|
debug_log = ""
|
|
if os.path.exists(self.testcase_args.debug_log_file):
|
|
with open(self.testcase_args.debug_log_file, "rb") as stream:
|
|
debug_log += self.testcase_args.debug_log_file + ":\n"
|
|
debug_log += str(stream.read(), errors="replace", encoding="utf-8")
|
|
debug_log += "\n"
|
|
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
|
|
# Normalize randomized database names in stdout, stderr files.
|
|
replace_in_file(self.stdout_file, database, "default")
|
|
if args.hide_db_name:
|
|
replace_in_file(self.stderr_file, database, "default")
|
|
if args.replicated_database:
|
|
replace_in_file(self.stdout_file, "/auto_{shard}", "")
|
|
replace_in_file(self.stdout_file, "auto_{replica}", "")
|
|
|
|
# Normalize hostname in stdout file.
|
|
replace_in_file(self.stdout_file, socket.gethostname(), "localhost")
|
|
|
|
if os.environ.get("CLICKHOUSE_PORT_TCP"):
|
|
replace_in_file(
|
|
self.stdout_file,
|
|
f"PORT {os.environ['CLICKHOUSE_PORT_TCP']}",
|
|
"PORT 9000",
|
|
)
|
|
replace_in_file(
|
|
self.stdout_file,
|
|
f"localhost {os.environ['CLICKHOUSE_PORT_TCP']}",
|
|
"localhost 9000",
|
|
)
|
|
|
|
if os.environ.get("CLICKHOUSE_PORT_TCP_SECURE"):
|
|
replace_in_file(
|
|
self.stdout_file,
|
|
f"PORT {os.environ['CLICKHOUSE_PORT_TCP_SECURE']}",
|
|
"PORT 9440",
|
|
)
|
|
replace_in_file(
|
|
self.stdout_file,
|
|
f"localhost {os.environ['CLICKHOUSE_PORT_TCP_SECURE']}",
|
|
"localhost 9440",
|
|
)
|
|
|
|
if os.environ.get("CLICKHOUSE_PATH"):
|
|
replace_in_file(
|
|
self.stdout_file,
|
|
os.environ["CLICKHOUSE_PATH"],
|
|
"/var/lib/clickhouse",
|
|
)
|
|
|
|
if os.environ.get("CLICKHOUSE_PORT_HTTPS"):
|
|
replace_in_file(
|
|
self.stdout_file,
|
|
f"https://localhost:{os.environ['CLICKHOUSE_PORT_HTTPS']}/",
|
|
"https://localhost:8443/",
|
|
)
|
|
|
|
stdout = ""
|
|
if os.path.exists(self.stdout_file):
|
|
with open(self.stdout_file, "rb") as stdfd:
|
|
stdout = str(stdfd.read(), errors="replace", encoding="utf-8")
|
|
|
|
stderr = ""
|
|
if os.path.exists(self.stderr_file):
|
|
with open(self.stderr_file, "rb") as stdfd:
|
|
stderr += str(stdfd.read(), errors="replace", encoding="utf-8")
|
|
|
|
return proc, stdout, stderr, debug_log, total_time
|
|
|
|
def run(self, args, suite, client_options, server_logs_level):
|
|
start_time = datetime.now()
|
|
|
|
try:
|
|
skip_reason = self.should_skip_test(suite)
|
|
if skip_reason is not None:
|
|
return TestResult(self.name, TestStatus.SKIPPED, skip_reason, 0.0, "")
|
|
|
|
if args.testname:
|
|
try:
|
|
self.send_test_name_failed(suite.suite, self.case)
|
|
except Exception:
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.SERVER_DIED,
|
|
0.0,
|
|
"\nServer does not respond to health check\n",
|
|
)
|
|
|
|
self.runs_count += 1
|
|
self.testcase_args = self.configure_testcase_args(
|
|
args, self.case_file, suite.suite_tmp_path
|
|
)
|
|
|
|
client_options = self.add_random_settings(client_options)
|
|
|
|
if not is_valid_utf_8(self.case_file) or (
|
|
self.reference_file and not is_valid_utf_8(self.reference_file)
|
|
):
|
|
proc, stdout, stderr, debug_log, total_time = self.run_single_test(
|
|
server_logs_level, client_options
|
|
)
|
|
|
|
result = self.process_result_impl(
|
|
proc, stdout, stderr, debug_log, total_time
|
|
)
|
|
result.check_if_need_retry(args, stdout, stderr, self.runs_count)
|
|
# to avoid breaking CSV parser
|
|
result.description = result.description.replace("\0", "")
|
|
else:
|
|
with SharedEngineReplacer(
|
|
self.case_file,
|
|
args.replace_replicated_with_shared,
|
|
args.replace_non_replicated_with_shared,
|
|
False,
|
|
):
|
|
with SharedEngineReplacer(
|
|
self.reference_file,
|
|
args.replace_replicated_with_shared,
|
|
args.replace_non_replicated_with_shared,
|
|
True,
|
|
):
|
|
(
|
|
proc,
|
|
stdout,
|
|
stderr,
|
|
debug_log,
|
|
total_time,
|
|
) = self.run_single_test(server_logs_level, client_options)
|
|
|
|
result = self.process_result_impl(
|
|
proc, stdout, stderr, debug_log, total_time
|
|
)
|
|
result.check_if_need_retry(
|
|
args, stdout, stderr, self.runs_count
|
|
)
|
|
# to avoid breaking CSV parser
|
|
result.description = result.description.replace("\0", "")
|
|
if result.status == TestStatus.FAIL:
|
|
result.description = self.add_info_about_settings(result.description)
|
|
|
|
self._cleanup(result.status == TestStatus.OK)
|
|
|
|
return result
|
|
except KeyboardInterrupt as e:
|
|
raise e
|
|
except HTTPError:
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.INTERNAL_QUERY_FAIL,
|
|
total_time,
|
|
self.add_info_about_settings(
|
|
self.get_description_from_exception_info(sys.exc_info())
|
|
),
|
|
)
|
|
except socket.timeout:
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.TIMEOUT,
|
|
total_time,
|
|
self.add_info_about_settings(
|
|
self.get_description_from_exception_info(sys.exc_info())
|
|
),
|
|
)
|
|
except (ConnectionError, http.client.ImproperConnectionState):
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.SERVER_DIED,
|
|
total_time,
|
|
self.add_info_about_settings(
|
|
self.get_description_from_exception_info(sys.exc_info())
|
|
),
|
|
)
|
|
except Exception:
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.UNKNOWN,
|
|
FailureReason.INTERNAL_ERROR,
|
|
total_time,
|
|
self.get_description_from_exception_info(sys.exc_info()),
|
|
)
|
|
finally:
|
|
self.remove_random_settings_from_env()
|
|
|
|
def _cleanup(self, passed):
|
|
args = self.testcase_args
|
|
|
|
need_cleanup = not args.database
|
|
if need_cleanup and args.no_drop_if_fail:
|
|
need_cleanup = passed
|
|
|
|
if not need_cleanup:
|
|
return
|
|
|
|
time_passed = (datetime.now() - args.testcase_start_time).total_seconds()
|
|
timeout = max(args.timeout - time_passed, 20)
|
|
|
|
self._cleanup_database(args, timeout)
|
|
shutil.rmtree(args.test_tmp_dir)
|
|
|
|
def _cleanup_database(self, args, timeout):
|
|
database = args.testcase_database
|
|
|
|
# Check if the test does not cleanup its tables.
|
|
# Only for newly added tests. Please extend this check to the old tests as well.
|
|
if self.case_file >= "02800":
|
|
leftover_tables = (
|
|
clickhouse_execute(
|
|
args,
|
|
f"SHOW TABLES FROM {database}",
|
|
timeout=timeout,
|
|
settings={
|
|
"log_comment": args.testcase_basename,
|
|
},
|
|
)
|
|
.decode()
|
|
.replace("\n", ", ")
|
|
)
|
|
|
|
if len(leftover_tables) != 0:
|
|
raise TestException(
|
|
f"The test should cleanup its tables ({leftover_tables}), otherwise it is inconvenient for running it locally."
|
|
)
|
|
|
|
drop_database_query = f"DROP DATABASE IF EXISTS {database}"
|
|
if args.replicated_database:
|
|
drop_database_query += " ON CLUSTER test_cluster_database_replicated"
|
|
|
|
# It's possible to get an error "New table appeared in database being dropped or detached. Try again."
|
|
for _ in range(1, 60):
|
|
try:
|
|
clickhouse_execute(
|
|
args,
|
|
drop_database_query,
|
|
timeout=timeout,
|
|
settings={
|
|
"log_comment": args.testcase_basename,
|
|
},
|
|
)
|
|
except HTTPError as e:
|
|
if need_retry(args, e.message, e.message, 0):
|
|
continue
|
|
raise
|
|
break
|
|
|
|
|
|
class TestSuite:
|
|
@staticmethod
|
|
def tests_in_suite_key_func(item: str) -> float:
|
|
if args.order == "random":
|
|
return random.random()
|
|
|
|
reverse = 1 if args.order == "asc" else -1
|
|
|
|
if -1 == item.find("_"):
|
|
return 99998
|
|
|
|
prefix, _ = item.split("_", 1)
|
|
|
|
try:
|
|
return reverse * int(prefix)
|
|
except ValueError:
|
|
return 99997
|
|
|
|
@staticmethod
|
|
def render_test_template(j2env, suite_dir, test_name):
|
|
"""
|
|
Render template for test and reference file if needed
|
|
"""
|
|
|
|
if j2env is None:
|
|
return test_name
|
|
|
|
test_base_name = removesuffix(test_name, ".sql.j2", ".sql")
|
|
|
|
reference_file_name = test_base_name + ".reference.j2"
|
|
reference_file_path = os.path.join(suite_dir, reference_file_name)
|
|
if os.path.isfile(reference_file_path):
|
|
tpl = j2env.get_template(reference_file_name)
|
|
tpl.stream().dump(
|
|
os.path.join(suite_dir, test_base_name) + ".gen.reference"
|
|
)
|
|
|
|
if test_name.endswith(".sql.j2"):
|
|
tpl = j2env.get_template(test_name)
|
|
generated_test_name = test_base_name + ".gen.sql"
|
|
tpl.stream().dump(os.path.join(suite_dir, generated_test_name))
|
|
return generated_test_name
|
|
|
|
return test_name
|
|
|
|
@staticmethod
|
|
def read_test_tags(suite_dir: str, all_tests: List[str]) -> Dict[str, Set[str]]:
|
|
def get_comment_sign(filename):
|
|
if filename.endswith(".sql") or filename.endswith(".sql.j2"):
|
|
return "--"
|
|
elif (
|
|
filename.endswith(".sh")
|
|
or filename.endswith(".py")
|
|
or filename.endswith(".expect")
|
|
):
|
|
return "#"
|
|
else:
|
|
raise TestException(f"Unknown file_extension: {filename}")
|
|
|
|
def parse_tags_from_line(line, comment_sign) -> Set[str]:
|
|
if not line.startswith(comment_sign):
|
|
return set()
|
|
tags_str = line[len(comment_sign) :].lstrip() # noqa: ignore E203
|
|
tags_prefix = "Tags:"
|
|
if not tags_str.startswith(tags_prefix):
|
|
return set()
|
|
tags_str = tags_str[len(tags_prefix) :] # noqa: ignore E203
|
|
tags = tags_str.split(",")
|
|
tags = {tag.strip() for tag in tags}
|
|
return tags
|
|
|
|
def is_shebang(line: str) -> bool:
|
|
return line.startswith("#!")
|
|
|
|
def find_tag_line(file):
|
|
for line in file:
|
|
line = line.strip()
|
|
if line and not is_shebang(line):
|
|
return line
|
|
return ""
|
|
|
|
def load_tags_from_file(filepath):
|
|
comment_sign = get_comment_sign(filepath)
|
|
need_query_params = False
|
|
with open(filepath, "r", encoding="utf-8") as file:
|
|
try:
|
|
tag_line = find_tag_line(file)
|
|
except UnicodeDecodeError:
|
|
return []
|
|
try:
|
|
if filepath.endswith(".sql"):
|
|
for line in file:
|
|
if "{CLICKHOUSE_DATABASE" in line:
|
|
need_query_params = True
|
|
except UnicodeDecodeError:
|
|
pass
|
|
parsed_tags = parse_tags_from_line(tag_line, comment_sign)
|
|
if need_query_params:
|
|
parsed_tags.add("need-query-parameters")
|
|
return parsed_tags
|
|
|
|
all_tags = {}
|
|
start_time = datetime.now()
|
|
for test_name in all_tests:
|
|
tags = load_tags_from_file(os.path.join(suite_dir, test_name))
|
|
if tags:
|
|
all_tags[test_name] = tags
|
|
elapsed = (datetime.now() - start_time).total_seconds()
|
|
if elapsed > 1:
|
|
print(f"Tags for suite {suite_dir} read in {elapsed:.2f} seconds")
|
|
return all_tags
|
|
|
|
def __init__(self, args, suite_path: str, suite_tmp_path: str, suite: str):
|
|
self.args = args
|
|
self.suite_path: str = suite_path
|
|
self.suite_tmp_path: str = suite_tmp_path
|
|
self.suite: str = suite
|
|
self.cloud_skip_list: List[str] = []
|
|
self.private_skip_list: List[str] = []
|
|
|
|
if args.run_by_hash_num is not None and args.run_by_hash_total is not None:
|
|
if args.run_by_hash_num > args.run_by_hash_total:
|
|
raise TestException(
|
|
f"Incorrect run by hash, value {args.run_by_hash_num} bigger than total {args.run_by_hash_total}"
|
|
)
|
|
|
|
def filter_func(x: str) -> bool:
|
|
return bool(
|
|
stringhash(x) % args.run_by_hash_total == args.run_by_hash_num
|
|
)
|
|
|
|
else:
|
|
|
|
def filter_func(x: str) -> bool:
|
|
_ = x
|
|
return True
|
|
|
|
self.all_tests: List[str] = self.get_tests_list(
|
|
self.tests_in_suite_key_func, filter_func
|
|
)
|
|
self.all_tags: Dict[str, Set[str]] = self.read_test_tags(
|
|
self.suite_path, self.all_tests
|
|
)
|
|
|
|
self.sequential_tests = []
|
|
self.parallel_tests = []
|
|
for test_name in self.all_tests:
|
|
if self.is_sequential_test(test_name):
|
|
self.sequential_tests.append(test_name)
|
|
else:
|
|
self.parallel_tests.append(test_name)
|
|
|
|
def is_sequential_test(self, test_name):
|
|
if args.sequential:
|
|
if any(s in test_name for s in args.sequential):
|
|
return True
|
|
|
|
if test_name not in self.all_tags:
|
|
return False
|
|
|
|
return ("no-parallel" in self.all_tags[test_name]) or (
|
|
"sequential" in self.all_tags[test_name]
|
|
)
|
|
|
|
def get_tests_list(self, sort_key, filter_func):
|
|
"""
|
|
Return list of tests file names to run
|
|
"""
|
|
|
|
all_tests = list(self.get_selected_tests(filter_func))
|
|
all_tests = all_tests * self.args.test_runs
|
|
all_tests.sort(key=sort_key)
|
|
return all_tests
|
|
|
|
def get_selected_tests(self, filter_func):
|
|
"""
|
|
Find all files with tests, filter, render templates
|
|
"""
|
|
|
|
j2env = (
|
|
jinja2.Environment(
|
|
loader=jinja2.FileSystemLoader(self.suite_path),
|
|
keep_trailing_newline=True,
|
|
)
|
|
if USE_JINJA
|
|
else None
|
|
)
|
|
if j2env is not None:
|
|
j2env.globals.update(product=itertools.product)
|
|
|
|
for test_name in os.listdir(self.suite_path):
|
|
if not is_test_from_dir(self.suite_path, test_name):
|
|
continue
|
|
if self.args.test and not any(
|
|
re.search(pattern, test_name) for pattern in self.args.test
|
|
):
|
|
continue
|
|
if USE_JINJA and test_name.endswith(".gen.sql"):
|
|
continue
|
|
if not filter_func(test_name):
|
|
continue
|
|
test_name = self.render_test_template(j2env, self.suite_path, test_name)
|
|
yield test_name
|
|
|
|
@staticmethod
|
|
def read_test_suite(args, suite_dir_name: str):
|
|
def is_data_present():
|
|
try:
|
|
return int(clickhouse_execute(args, "EXISTS TABLE test.hits"))
|
|
except Exception as e:
|
|
print(
|
|
"Cannot check if dataset is available, assuming it's not: ", str(e)
|
|
)
|
|
return False
|
|
|
|
base_dir = os.path.abspath(args.queries)
|
|
tmp_dir = os.path.abspath(args.tmp)
|
|
suite_path = os.path.join(base_dir, suite_dir_name)
|
|
|
|
suite_re_obj = re.search("^[0-9]+_(.*)$", suite_dir_name)
|
|
if not suite_re_obj: # skip .gitignore and so on
|
|
return None
|
|
|
|
suite_tmp_path = os.path.join(tmp_dir, suite_dir_name)
|
|
if not os.path.exists(suite_tmp_path):
|
|
os.makedirs(suite_tmp_path)
|
|
|
|
suite = suite_re_obj.group(1)
|
|
|
|
if not os.path.isdir(suite_path):
|
|
return None
|
|
|
|
if "stateful" in suite and not args.no_stateful and not is_data_present():
|
|
print("Won't run stateful tests because test data wasn't loaded.")
|
|
return None
|
|
if "stateless" in suite and args.no_stateless:
|
|
print("Won't run stateless tests because they were manually disabled.")
|
|
return None
|
|
if "stateful" in suite and args.no_stateful:
|
|
print("Won't run stateful tests because they were manually disabled.")
|
|
return None
|
|
|
|
return TestSuite(args, suite_path, suite_tmp_path, suite)
|
|
|
|
|
|
stop_time = None
|
|
exit_code = None
|
|
server_died = None
|
|
multiprocessing_manager = None
|
|
restarted_tests = None
|
|
|
|
|
|
class ServerDied(Exception):
|
|
pass
|
|
|
|
|
|
class GlobalTimeout(Exception):
|
|
pass
|
|
|
|
|
|
def run_tests_array(all_tests_with_params: Tuple[List[str], int, TestSuite, bool]):
|
|
(
|
|
all_tests,
|
|
num_tests,
|
|
test_suite,
|
|
is_concurrent,
|
|
) = all_tests_with_params
|
|
global stop_time
|
|
global exit_code
|
|
global server_died
|
|
global restarted_tests
|
|
|
|
OP_SQUARE_BRACKET = colored("[", args, attrs=["bold"])
|
|
CL_SQUARE_BRACKET = colored("]", args, attrs=["bold"])
|
|
|
|
MSG_FAIL = (
|
|
OP_SQUARE_BRACKET
|
|
+ colored(" FAIL ", args, "red", attrs=["bold"])
|
|
+ CL_SQUARE_BRACKET
|
|
)
|
|
MSG_UNKNOWN = (
|
|
OP_SQUARE_BRACKET
|
|
+ colored(" UNKNOWN ", args, "yellow", attrs=["bold"])
|
|
+ CL_SQUARE_BRACKET
|
|
)
|
|
MSG_OK = (
|
|
OP_SQUARE_BRACKET
|
|
+ colored(" OK ", args, "green", attrs=["bold"])
|
|
+ CL_SQUARE_BRACKET
|
|
)
|
|
MSG_SKIPPED = (
|
|
OP_SQUARE_BRACKET
|
|
+ colored(" SKIPPED ", args, "cyan", attrs=["bold"])
|
|
+ CL_SQUARE_BRACKET
|
|
)
|
|
|
|
MESSAGES = {
|
|
TestStatus.FAIL: MSG_FAIL,
|
|
TestStatus.UNKNOWN: MSG_UNKNOWN,
|
|
TestStatus.OK: MSG_OK,
|
|
TestStatus.SKIPPED: MSG_SKIPPED,
|
|
}
|
|
|
|
passed_total = 0
|
|
skipped_total = 0
|
|
failures_total = 0
|
|
failures_chain = 0
|
|
start_time = datetime.now()
|
|
|
|
client_options = get_additional_client_options(args)
|
|
|
|
if num_tests > 0:
|
|
about = "about " if is_concurrent else ""
|
|
proc_name = multiprocessing.current_process().name
|
|
print(f"Running {about}{num_tests} {test_suite.suite} tests ({proc_name}).")
|
|
|
|
while True:
|
|
if all_tests:
|
|
try:
|
|
case = all_tests.pop(0)
|
|
except IndexError:
|
|
break
|
|
else:
|
|
break
|
|
|
|
if server_died.is_set():
|
|
stop_tests()
|
|
raise ServerDied("Server died")
|
|
|
|
if stop_time and time() > stop_time:
|
|
print("\nStop tests run because global time limit is exceeded.\n")
|
|
stop_tests()
|
|
raise GlobalTimeout("Stop tests run because global time limit is exceeded")
|
|
|
|
test_case = TestCase(test_suite, case, args, is_concurrent)
|
|
|
|
try:
|
|
description = ""
|
|
test_case_name = removesuffix(test_case.name, ".gen", ".sql") + ": "
|
|
|
|
if is_concurrent:
|
|
description = f"{test_case_name:72}"
|
|
else:
|
|
sys.stdout.flush()
|
|
sys.stdout.write(f"{test_case_name:72}")
|
|
# This flush is needed so you can see the test name of the long
|
|
# running test before it will finish. But don't do it in parallel
|
|
# mode, so that the lines don't mix.
|
|
sys.stdout.flush()
|
|
|
|
while True:
|
|
# This is the upper level timeout
|
|
# It helps with completely frozen processes, like in case of gdb errors
|
|
def timeout_handler(signum, frame):
|
|
raise TimeoutError("Test execution timed out")
|
|
|
|
signal.signal(signal.SIGALRM, timeout_handler)
|
|
signal.alarm(int(args.timeout * 1.1))
|
|
test_result = None
|
|
try:
|
|
test_result = test_case.run(
|
|
args, test_suite, client_options, server_logs_level
|
|
)
|
|
test_result = test_case.process_result(test_result, MESSAGES)
|
|
except TimeoutError:
|
|
break
|
|
finally:
|
|
signal.alarm(0)
|
|
|
|
if not test_result or not test_result.need_retry:
|
|
break
|
|
restarted_tests.append(test_result)
|
|
|
|
# First print the description, than invoke the check result logic
|
|
description += test_result.description
|
|
|
|
if description and not description.endswith("\n"):
|
|
description += "\n"
|
|
|
|
sys.stdout.write(description)
|
|
sys.stdout.flush()
|
|
|
|
if test_result.status == TestStatus.OK:
|
|
passed_total += 1
|
|
failures_chain = 0
|
|
elif test_result.status == TestStatus.FAIL:
|
|
failures_total += 1
|
|
failures_chain += 1
|
|
if test_result.reason == FailureReason.SERVER_DIED:
|
|
stop_tests()
|
|
server_died.set()
|
|
raise ServerDied("Server died")
|
|
elif test_result.status == TestStatus.SKIPPED:
|
|
skipped_total += 1
|
|
|
|
except KeyboardInterrupt as e:
|
|
print(colored("Break tests execution", args, "red"))
|
|
stop_tests()
|
|
raise e
|
|
|
|
if failures_chain >= args.max_failures_chain:
|
|
stop_tests()
|
|
raise ServerDied("Max failures chain")
|
|
|
|
if failures_total > 0:
|
|
print(
|
|
colored(
|
|
f"\nHaving {failures_total} errors! {passed_total} tests passed."
|
|
f" {skipped_total} tests skipped."
|
|
f" {(datetime.now() - start_time).total_seconds():.2f} s elapsed"
|
|
f" ({multiprocessing.current_process().name}).",
|
|
args,
|
|
"red",
|
|
attrs=["bold"],
|
|
)
|
|
)
|
|
exit_code.value = 1
|
|
else:
|
|
print(
|
|
colored(
|
|
f"\n{passed_total} tests passed. {skipped_total} tests skipped."
|
|
f" {(datetime.now() - start_time).total_seconds():.2f} s elapsed"
|
|
f" ({multiprocessing.current_process().name}).",
|
|
args,
|
|
"green",
|
|
attrs=["bold"],
|
|
)
|
|
)
|
|
|
|
sys.stdout.flush()
|
|
|
|
|
|
server_logs_level = "warning"
|
|
|
|
|
|
def check_server_started(args):
|
|
print("Connecting to ClickHouse server...", end="")
|
|
|
|
sys.stdout.flush()
|
|
retry_count = args.server_check_retries
|
|
query = "SELECT version(), arrayStringConcat(groupArray(value), ' ') FROM system.build_options WHERE name IN ('GIT_HASH', 'GIT_BRANCH')"
|
|
while retry_count > 0:
|
|
try:
|
|
res = (
|
|
str(clickhouse_execute(args, query).decode())
|
|
.strip()
|
|
.replace("\t", " @ ")
|
|
)
|
|
print(" OK")
|
|
print(f"Connected to server {res}")
|
|
sys.stdout.flush()
|
|
return True
|
|
except (ConnectionError, http.client.ImproperConnectionState) as e:
|
|
if args.hung_check:
|
|
print("Connection error, will retry: ", str(e))
|
|
else:
|
|
print(".", end="")
|
|
sys.stdout.flush()
|
|
retry_count -= 1
|
|
sleep(0.5)
|
|
continue
|
|
except TimeoutError:
|
|
print("\nConnection timeout, will not retry")
|
|
break
|
|
except Exception as e:
|
|
print(
|
|
"\nUexpected exception, will not retry: ",
|
|
type(e).__name__,
|
|
": ",
|
|
str(e),
|
|
)
|
|
break
|
|
|
|
print("\nAll connection tries failed")
|
|
sys.stdout.flush()
|
|
return False
|
|
|
|
|
|
class BuildFlags:
|
|
THREAD = "tsan"
|
|
ADDRESS = "asan"
|
|
UNDEFINED = "ubsan"
|
|
MEMORY = "msan"
|
|
DEBUG = "debug"
|
|
SANITIZE_COVERAGE = "sanitize-coverage"
|
|
RELEASE = "release"
|
|
ORDINARY_DATABASE = "ordinary-database"
|
|
POLYMORPHIC_PARTS = "polymorphic-parts"
|
|
|
|
|
|
def collect_build_flags(args):
|
|
result = []
|
|
|
|
value = clickhouse_execute(
|
|
args, "SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'"
|
|
)
|
|
if b"-fsanitize=thread" in value:
|
|
result.append(BuildFlags.THREAD)
|
|
elif b"-fsanitize=address" in value:
|
|
result.append(BuildFlags.ADDRESS)
|
|
elif b"-fsanitize=undefined" in value:
|
|
result.append(BuildFlags.UNDEFINED)
|
|
elif b"-fsanitize=memory" in value:
|
|
result.append(BuildFlags.MEMORY)
|
|
elif b"-DSANITIZE_COVERAGE=1" in value:
|
|
result.append(BuildFlags.SANITIZE_COVERAGE)
|
|
|
|
value = clickhouse_execute(
|
|
args, "SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'"
|
|
)
|
|
if b"Debug" in value:
|
|
result.append(BuildFlags.DEBUG)
|
|
elif b"RelWithDebInfo" in value or b"Release" in value:
|
|
result.append(BuildFlags.RELEASE)
|
|
|
|
value = clickhouse_execute(
|
|
args,
|
|
"SELECT value FROM system.settings WHERE name = 'allow_deprecated_database_ordinary'",
|
|
)
|
|
if value == b"1" or args.db_engine == "Ordinary":
|
|
result.append(BuildFlags.ORDINARY_DATABASE)
|
|
|
|
value = int(
|
|
clickhouse_execute(
|
|
args,
|
|
"SELECT value FROM system.merge_tree_settings WHERE name = 'min_bytes_for_wide_part'",
|
|
)
|
|
)
|
|
if value == 0:
|
|
result.append(BuildFlags.POLYMORPHIC_PARTS)
|
|
|
|
use_flags = clickhouse_execute(
|
|
args,
|
|
"SELECT name FROM system.build_options WHERE name like 'USE_%' AND value in ('ON', '1')",
|
|
)
|
|
for use_flag in use_flags.strip().splitlines():
|
|
use_flag = use_flag.decode().lower()
|
|
result.append(use_flag)
|
|
|
|
system_processor = clickhouse_execute(
|
|
args,
|
|
"SELECT value FROM system.build_options WHERE name = 'SYSTEM_PROCESSOR' LIMIT 1",
|
|
).strip()
|
|
if system_processor:
|
|
result.append(f"cpu-{system_processor.decode().lower()}")
|
|
|
|
return result
|
|
|
|
|
|
def collect_changed_merge_tree_settings(args):
|
|
changed_settings = (
|
|
clickhouse_execute(
|
|
args,
|
|
"SELECT name FROM system.merge_tree_settings WHERE changed",
|
|
)
|
|
.strip()
|
|
.splitlines()
|
|
)
|
|
|
|
return list(map(lambda s: s.decode(), changed_settings))
|
|
|
|
|
|
def check_table_column(args, database, table, column):
|
|
return (
|
|
int(
|
|
clickhouse_execute(
|
|
args,
|
|
f"""
|
|
SELECT count()
|
|
FROM system.columns
|
|
WHERE database = '{database}' AND table = '{table}' AND name = '{column}'
|
|
""",
|
|
)
|
|
)
|
|
> 0
|
|
)
|
|
|
|
|
|
def suite_key_func(item: str) -> Union[float, Tuple[int, str]]:
|
|
if args.order == "random":
|
|
return random.random()
|
|
|
|
if -1 == item.find("_"):
|
|
return 99998, ""
|
|
|
|
prefix, suffix = item.split("_", 1)
|
|
|
|
try:
|
|
return int(prefix), suffix
|
|
except ValueError:
|
|
return 99997, ""
|
|
|
|
|
|
def extract_key(key: str) -> str:
|
|
return subprocess.getstatusoutput(
|
|
args.extract_from_config + " --try --config " + args.configserver + key
|
|
)[1]
|
|
|
|
|
|
def run_tests_process(*args, **kwargs):
|
|
return run_tests_array(*args, **kwargs)
|
|
|
|
|
|
def do_run_tests(jobs, test_suite: TestSuite):
|
|
if jobs > 1 and len(test_suite.parallel_tests) > 0:
|
|
print(
|
|
"Found",
|
|
len(test_suite.parallel_tests),
|
|
"parallel tests and",
|
|
len(test_suite.sequential_tests),
|
|
"sequential tests",
|
|
)
|
|
tests_n = len(test_suite.parallel_tests)
|
|
jobs = min(jobs, tests_n)
|
|
|
|
# If we don't do random shuffling then there will be always
|
|
# nearly the same groups of test suites running concurrently.
|
|
# Thus, if there is a test within group which appears to be broken
|
|
# then it will affect all other tests in a non-random form.
|
|
# So each time a bad test fails - other tests from the group will also fail
|
|
# and this process will be more or less stable.
|
|
# It makes it more difficult to detect real flaky tests,
|
|
# because the distribution and the amount
|
|
# of failures will be nearly the same for all tests from the group.
|
|
random.shuffle(test_suite.parallel_tests)
|
|
|
|
batch_size = len(test_suite.parallel_tests) // jobs
|
|
manager = multiprocessing.Manager()
|
|
parallel_tests = manager.list()
|
|
parallel_tests.extend(test_suite.parallel_tests)
|
|
|
|
processes = []
|
|
for _ in range(jobs):
|
|
process = multiprocessing.Process(
|
|
target=run_tests_process,
|
|
args=((parallel_tests, batch_size, test_suite, True),),
|
|
)
|
|
processes.append(process)
|
|
process.start()
|
|
|
|
while processes:
|
|
sys.stdout.flush()
|
|
# Periodically check the server for hangs
|
|
# and stop all processes in this case
|
|
try:
|
|
clickhouse_execute(
|
|
args,
|
|
query="SELECT 1 /*hang up check*/",
|
|
max_http_retries=5,
|
|
timeout=20,
|
|
)
|
|
except Exception:
|
|
print("Hang up check failed")
|
|
server_died.set()
|
|
|
|
if server_died.is_set():
|
|
print("Server died, terminating all processes...")
|
|
kill_gdb_if_any()
|
|
# Wait for test results
|
|
sleep(args.timeout)
|
|
for p in processes:
|
|
if p.is_alive():
|
|
p.terminate()
|
|
break
|
|
|
|
for p in processes[:]:
|
|
if not p.is_alive():
|
|
processes.remove(p)
|
|
|
|
sleep(5)
|
|
|
|
run_tests_array(
|
|
(
|
|
test_suite.sequential_tests,
|
|
len(test_suite.sequential_tests),
|
|
test_suite,
|
|
False,
|
|
)
|
|
)
|
|
|
|
return len(test_suite.sequential_tests) + len(test_suite.parallel_tests)
|
|
else:
|
|
num_tests = len(test_suite.all_tests)
|
|
run_tests_array(
|
|
(
|
|
test_suite.all_tests,
|
|
num_tests,
|
|
test_suite,
|
|
False,
|
|
)
|
|
)
|
|
return num_tests
|
|
|
|
|
|
def is_test_from_dir(suite_dir, case):
|
|
case_file = os.path.join(suite_dir, case)
|
|
# We could also test for executable files (os.access(case_file, os.X_OK),
|
|
# but it interferes with 01610_client_spawn_editor.editor, which is invoked
|
|
# as a query editor in the test, and must be marked as executable.
|
|
return os.path.isfile(case_file) and any(
|
|
case_file.endswith(suppotred_ext) for suppotred_ext in TEST_FILE_EXTENSIONS
|
|
)
|
|
|
|
|
|
def removesuffix(text, *suffixes):
|
|
"""
|
|
Added in python 3.9
|
|
https://www.python.org/dev/peps/pep-0616/
|
|
|
|
This version can work with several possible suffixes
|
|
"""
|
|
for suffix in suffixes:
|
|
if suffix and text.endswith(suffix):
|
|
return text[: -len(suffix)]
|
|
return text
|
|
|
|
|
|
def reportCoverageFor(args, what, query, permissive=False):
|
|
value = clickhouse_execute(args, query).decode()
|
|
|
|
if value != "":
|
|
print(f"\nThe following {what} were not covered by tests:\n")
|
|
print(value)
|
|
print("\n")
|
|
return permissive
|
|
|
|
return True
|
|
|
|
|
|
# This is high-level coverage on per-component basis (functions, data types, etc.)
|
|
# Don't be confused with the code coverage.
|
|
def reportCoverage(args):
|
|
clickhouse_execute(args, "SYSTEM FLUSH LOGS")
|
|
|
|
return (
|
|
reportCoverageFor(
|
|
args,
|
|
"functions",
|
|
"""
|
|
SELECT name
|
|
FROM system.functions
|
|
WHERE NOT is_aggregate AND origin = 'System' AND alias_to = ''
|
|
AND name NOT IN
|
|
(
|
|
SELECT arrayJoin(used_functions) FROM system.query_log WHERE event_date >= yesterday()
|
|
)
|
|
ORDER BY name
|
|
""",
|
|
True,
|
|
)
|
|
and reportCoverageFor(
|
|
args,
|
|
"aggregate functions",
|
|
"""
|
|
SELECT name
|
|
FROM system.functions
|
|
WHERE is_aggregate AND origin = 'System' AND alias_to = ''
|
|
AND name NOT IN
|
|
(
|
|
SELECT arrayJoin(used_aggregate_functions) FROM system.query_log WHERE event_date >= yesterday()
|
|
)
|
|
ORDER BY name
|
|
""",
|
|
)
|
|
and reportCoverageFor(
|
|
args,
|
|
"aggregate function combinators",
|
|
"""
|
|
SELECT name
|
|
FROM system.aggregate_function_combinators
|
|
WHERE NOT is_internal
|
|
AND name NOT IN
|
|
(
|
|
SELECT arrayJoin(used_aggregate_function_combinators) FROM system.query_log WHERE event_date >= yesterday()
|
|
)
|
|
ORDER BY name
|
|
""",
|
|
)
|
|
and reportCoverageFor(
|
|
args,
|
|
"data type families",
|
|
"""
|
|
SELECT name
|
|
FROM system.data_type_families
|
|
WHERE alias_to = '' AND name NOT LIKE 'Interval%'
|
|
AND name NOT IN
|
|
(
|
|
SELECT arrayJoin(used_data_type_families) FROM system.query_log WHERE event_date >= yesterday()
|
|
)
|
|
ORDER BY name
|
|
""",
|
|
)
|
|
)
|
|
|
|
|
|
def reportLogStats(args):
|
|
clickhouse_execute(args, "SYSTEM FLUSH LOGS")
|
|
|
|
query = """
|
|
WITH
|
|
240 AS mins,
|
|
(
|
|
SELECT (count(), sum(length(toValidUTF8(message))))
|
|
FROM system.text_log
|
|
WHERE (now() - toIntervalMinute(mins)) < event_time
|
|
) AS total
|
|
SELECT
|
|
count() AS count,
|
|
round(count / (total.1), 3) AS `count_%`,
|
|
formatReadableSize(sum(length(toValidUTF8(message)))) AS size,
|
|
round(sum(length(toValidUTF8(message))) / (total.2), 3) AS `size_%`,
|
|
countDistinct(logger_name) AS uniq_loggers,
|
|
countDistinct(thread_id) AS uniq_threads,
|
|
groupArrayDistinct(toString(level)) AS levels,
|
|
round(sum(query_id = '') / count, 3) AS `background_%`,
|
|
message_format_string
|
|
FROM system.text_log
|
|
WHERE (now() - toIntervalMinute(mins)) < event_time
|
|
GROUP BY message_format_string
|
|
ORDER BY count DESC
|
|
LIMIT 100
|
|
FORMAT PrettySpaceNoEscapes
|
|
"""
|
|
value = clickhouse_execute(args, query).decode(errors="replace")
|
|
print("\nTop patterns of log messages:\n")
|
|
print(value)
|
|
print("\n")
|
|
|
|
query = """
|
|
WITH
|
|
240 AS mins
|
|
SELECT
|
|
count() AS count,
|
|
substr(replaceRegexpAll(toValidUTF8(message), '[^A-Za-z]+', ''), 1, 32) AS pattern,
|
|
substr(any(toValidUTF8(message)), 1, 256) as runtime_message,
|
|
any((extract(source_file, '/[a-zA-Z0-9_]+\\.[a-z]+'), source_line)) as line
|
|
FROM system.text_log
|
|
WHERE (now() - toIntervalMinute(mins)) < event_time AND message_format_string = ''
|
|
GROUP BY pattern
|
|
ORDER BY count DESC
|
|
LIMIT 30
|
|
FORMAT PrettySpaceNoEscapes
|
|
"""
|
|
value = clickhouse_execute(args, query).decode(errors="replace")
|
|
print("\nTop messages without format string (fmt::runtime):\n")
|
|
print(value)
|
|
print("\n")
|
|
|
|
query = """
|
|
SELECT message_format_string, count(), any(toValidUTF8(message)) AS any_message
|
|
FROM system.text_log
|
|
WHERE (now() - toIntervalMinute(240)) < event_time
|
|
AND (message NOT LIKE (replaceRegexpAll(message_format_string, '{[:.0-9dfx]*}', '%') AS s))
|
|
AND (message NOT LIKE concat('%Exception: ', s, '%'))
|
|
GROUP BY message_format_string ORDER BY count() DESC LIMIT 20 FORMAT PrettySpaceNoEscapes
|
|
"""
|
|
value = clickhouse_execute(args, query).decode(errors="replace")
|
|
print("\nTop messages not matching their format strings:\n")
|
|
print(value)
|
|
print("\n")
|
|
|
|
query = """
|
|
WITH ('', '({}) Keys: {}', '({}) {}', 'Aggregating', 'Became leader', 'Cleaning queue',
|
|
'Creating set.', 'Cyclic aliases', 'Detaching {}', 'Executing {}', 'Fire events: {}',
|
|
'Found part {}', 'Loaded queue', 'No sharding key', 'No tables', 'Query: {}',
|
|
'Removed', 'Removed part {}', 'Removing parts.', 'Request URI: {}', 'Sending part {}',
|
|
'Sent handshake', 'Starting {}', 'Will mimic {}', 'Writing to {}', 'dropIfEmpty',
|
|
'loadAll {}', '{} ({}:{})', '{} -> {}', '{} {}', '{}: {}', '{}%', 'Read object: {}',
|
|
'New segment: {}', 'Convert overflow', 'Division by zero', 'Files set to {}',
|
|
'Bytes set to {}', 'Numeric overflow', 'Invalid mode: {}',
|
|
'Write file: {}', 'Unable to parse JSONPath', 'Host is empty in S3 URI.', 'Expected end of line',
|
|
'inflate failed: {}{}', 'Center is not valid', 'Column ''{}'' is ambiguous', 'Cannot parse object', 'Invalid date: {}',
|
|
'There is no cache by name: {}', 'No part {} in table', '`{}` should be a String', 'There are duplicate id {}',
|
|
'Invalid replica name: {}', 'Unexpected value {} in enum', 'Unknown BSON type: {}', 'Point is not valid',
|
|
'Invalid qualified name: {}', 'INTO OUTFILE is not allowed', 'Arguments must not be NaN', 'Cell is not valid',
|
|
'brotli decode error{}', 'Invalid H3 index: {}', 'Too large node state size', 'No additional keys found.',
|
|
'Attempt to read after EOF.', 'Replication was stopped', '{} building file infos', 'Cannot parse uuid {}',
|
|
'Query was cancelled', 'Cancelled merging parts', 'Cancelled mutating parts', 'Log pulling is cancelled',
|
|
'Transaction was cancelled', 'Could not find table: {}', 'Table {} doesn''t exist',
|
|
'Database {} doesn''t exist', 'Dictionary ({}) not found', 'Unknown table function {}',
|
|
'Unknown format {}', 'Unknown explain kind ''{}''', 'Unknown setting {}', 'Unknown input format {}',
|
|
'Unknown identifier: ''{}''', 'User name is empty', 'Expected function, got: {}',
|
|
'Attempt to read after eof', 'String size is too big ({}), maximum: {}'
|
|
) AS known_short_messages
|
|
SELECT count() AS c, message_format_string, substr(any(toValidUTF8(message)), 1, 120),
|
|
min(if(length(regexpExtract(toValidUTF8(message), '(.*)\\([A-Z0-9_]+\\)')) as prefix_len > 0, prefix_len, length(toValidUTF8(message))) - 26 AS length_without_exception_boilerplate) AS min_length_without_exception_boilerplate
|
|
FROM system.text_log
|
|
WHERE (now() - toIntervalMinute(240)) < event_time
|
|
AND (length(message_format_string) < 16
|
|
OR (message ILIKE '%DB::Exception%' AND length_without_exception_boilerplate < 30))
|
|
AND message_format_string NOT IN known_short_messages
|
|
GROUP BY message_format_string ORDER BY c DESC LIMIT 50 FORMAT PrettySpaceNoEscapes
|
|
"""
|
|
value = clickhouse_execute(args, query).decode(errors="replace")
|
|
print("\nTop short messages:\n")
|
|
print(value)
|
|
print("\n")
|
|
|
|
query = """
|
|
SELECT max((freq, message_format_string)), level
|
|
FROM (SELECT count() / (SELECT count() FROM system.text_log
|
|
WHERE (now() - toIntervalMinute(240)) < event_time) AS freq,
|
|
min(level) AS level, message_format_string FROM system.text_log
|
|
WHERE (now() - toIntervalMinute(120)) < event_time
|
|
GROUP BY message_format_string ORDER BY freq DESC)
|
|
GROUP BY level
|
|
"""
|
|
value = clickhouse_execute(args, query).decode(errors="replace")
|
|
print("\nTop messages by level:\n")
|
|
print(value)
|
|
print("\n")
|
|
|
|
|
|
def try_get_skip_list(base_dir, name):
|
|
test_names_to_skip = []
|
|
skip_list_path = os.path.join(base_dir, name)
|
|
if not os.path.exists(skip_list_path):
|
|
return test_names_to_skip
|
|
|
|
with open(skip_list_path, "r", encoding="utf-8") as fd:
|
|
for line in fd.read().split("\n"):
|
|
if line == "" or line[0] == " ":
|
|
continue
|
|
test_name = line.split()[0].strip()
|
|
if test_name != "":
|
|
test_names_to_skip.append(test_name)
|
|
|
|
return test_names_to_skip
|
|
|
|
|
|
def main(args):
|
|
global server_died
|
|
global stop_time
|
|
global exit_code
|
|
global server_logs_level
|
|
global restarted_tests
|
|
|
|
if not check_server_started(args):
|
|
msg = "Server is not responding. Cannot execute 'SELECT 1' query."
|
|
if args.hung_check:
|
|
print(msg)
|
|
pid = get_server_pid()
|
|
print("Got server pid", pid)
|
|
print_stacktraces()
|
|
raise TestException(msg)
|
|
|
|
args.build_flags = collect_build_flags(args)
|
|
args.changed_merge_tree_settings = collect_changed_merge_tree_settings(args)
|
|
|
|
if args.s3_storage and (BuildFlags.RELEASE not in args.build_flags):
|
|
args.no_random_settings = True
|
|
|
|
if args.skip:
|
|
args.skip = set(args.skip)
|
|
|
|
if args.replace_replicated_with_shared:
|
|
if not args.skip:
|
|
args.skip = set([])
|
|
args.skip = set(args.skip)
|
|
|
|
base_dir = os.path.abspath(args.queries)
|
|
|
|
# Keep same default values as in queries/shell_config.sh
|
|
os.environ.setdefault("CLICKHOUSE_BINARY", args.binary)
|
|
# os.environ.setdefault("CLICKHOUSE_CLIENT", args.client)
|
|
os.environ.setdefault("CLICKHOUSE_CONFIG", args.configserver)
|
|
|
|
if args.configclient:
|
|
os.environ.setdefault("CLICKHOUSE_CONFIG_CLIENT", args.configclient)
|
|
|
|
# Force to print server warnings in stderr
|
|
# Shell scripts could change logging level
|
|
os.environ.setdefault("CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL", server_logs_level)
|
|
|
|
# This code is bad as the time is not monotonic
|
|
if args.global_time_limit:
|
|
stop_time = time() + args.global_time_limit
|
|
|
|
if args.zookeeper is None:
|
|
args.zookeeper = True
|
|
|
|
if args.shard is None:
|
|
args.shard = bool(extract_key(' --key listen_host | grep -E "127.0.0.2|::"'))
|
|
|
|
def create_common_database(args, db_name):
|
|
create_database_retries = 0
|
|
while create_database_retries < MAX_RETRIES:
|
|
start_time = datetime.now()
|
|
try:
|
|
clickhouse_execute(
|
|
args,
|
|
f"CREATE DATABASE IF NOT EXISTS {db_name} "
|
|
f"{get_db_engine(args, db_name)}",
|
|
settings=get_create_database_settings(args, None),
|
|
)
|
|
break
|
|
except HTTPError as e:
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
if not need_retry(args, e.message, e.message, total_time):
|
|
break
|
|
create_database_retries += 1
|
|
|
|
try:
|
|
if args.database and args.database != "test":
|
|
create_common_database(args, args.database)
|
|
|
|
create_common_database(args, "test")
|
|
except Exception as e:
|
|
print(f"Failed to create databases for tests: {e}")
|
|
server_died.set()
|
|
|
|
if (
|
|
args.collect_per_test_coverage
|
|
and BuildFlags.SANITIZE_COVERAGE in args.build_flags
|
|
):
|
|
clickhouse_execute(
|
|
args,
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS system.coverage_log
|
|
(
|
|
time DateTime,
|
|
test_name String,
|
|
coverage Array(UInt64)
|
|
) ENGINE = MergeTree ORDER BY test_name
|
|
COMMENT 'Contains information about per-test coverage from the CI, but used only for exporting to the CI cluster';
|
|
""",
|
|
)
|
|
|
|
# Coverage collected at the system startup before running any tests:
|
|
clickhouse_execute(
|
|
args,
|
|
"INSERT INTO system.coverage_log SELECT now(), '', coverageCurrent()",
|
|
)
|
|
|
|
total_tests_run = 0
|
|
cloud_skip_list = try_get_skip_list(base_dir, "../queries-no-cloud-tests.txt")
|
|
private_skip_list = try_get_skip_list(base_dir, "../queries-no-private-tests.txt")
|
|
|
|
for suite in sorted(os.listdir(base_dir), key=suite_key_func):
|
|
if server_died.is_set():
|
|
break
|
|
|
|
test_suite = TestSuite.read_test_suite(args, suite)
|
|
if test_suite is None:
|
|
continue
|
|
|
|
test_suite.cloud_skip_list = cloud_skip_list
|
|
test_suite.private_skip_list = private_skip_list
|
|
total_tests_run += do_run_tests(args.jobs, test_suite)
|
|
|
|
if server_died.is_set():
|
|
exit_code.value = 1
|
|
|
|
if args.hung_check:
|
|
# Some queries may execute in background for some time after test was finished. This is normal.
|
|
print("Checking the hung queries: ", end="")
|
|
hung_count = 0
|
|
try:
|
|
deadline = datetime.now() + timedelta(seconds=90)
|
|
while datetime.now() < deadline:
|
|
hung_count = get_processlist_size(args)
|
|
if hung_count == 0:
|
|
print(" done")
|
|
break
|
|
print(". ", end="")
|
|
except Exception as e:
|
|
print(
|
|
colored(
|
|
"\nHung check failed. Failed to get processlist size: " + str(e),
|
|
args,
|
|
"red",
|
|
attrs=["bold"],
|
|
)
|
|
)
|
|
exit_code.value = 1
|
|
|
|
processlist = ""
|
|
if hung_count > 0:
|
|
try:
|
|
processlist = get_processlist_with_stacktraces(args)
|
|
except Exception as e:
|
|
print(
|
|
colored(
|
|
"\nHung check failed. Failed to get processlist with stacktraces: "
|
|
+ str(e),
|
|
args,
|
|
"red",
|
|
attrs=["bold"],
|
|
)
|
|
)
|
|
exit_code.value = 1
|
|
|
|
if processlist:
|
|
print(
|
|
colored(
|
|
"\nFound hung queries in processlist:", args, "red", attrs=["bold"]
|
|
)
|
|
)
|
|
print(processlist.decode())
|
|
print(get_transactions_list(args))
|
|
|
|
print_stacktraces()
|
|
exit_code.value = 1
|
|
else:
|
|
print(colored("\nNo queries hung.", args, "green", attrs=["bold"]))
|
|
|
|
if args.client_log:
|
|
if os.path.exists(args.client_log):
|
|
with open(args.client_log, "rb") as stream:
|
|
content = stream.read().decode()
|
|
if len(content):
|
|
print("Has fatal logs from client:\n")
|
|
print(content)
|
|
os.remove(args.client_log)
|
|
|
|
if len(restarted_tests) > 0:
|
|
print("\nSome tests were restarted:\n")
|
|
|
|
for test_result in restarted_tests:
|
|
print(f"\n{test_result.case_name:72}: ")
|
|
# replace it with lowercase to avoid parsing retried tests as failed
|
|
for status in TestStatus:
|
|
test_result.description = test_result.description.replace(
|
|
status.value, status.value.lower()
|
|
)
|
|
print(test_result.description)
|
|
|
|
if total_tests_run == 0:
|
|
print("No tests were run.")
|
|
sys.exit(1)
|
|
else:
|
|
print("All tests have finished.")
|
|
|
|
if args.report_logs_stats:
|
|
try:
|
|
reportLogStats(args)
|
|
except Exception as e:
|
|
print(f"Failed to get stats about log messages: {e}")
|
|
|
|
if args.report_coverage and not reportCoverage(args):
|
|
exit_code.value = 1
|
|
|
|
sys.exit(exit_code.value)
|
|
|
|
|
|
def find_binary(name):
|
|
if os.access(name, os.X_OK):
|
|
return name
|
|
paths = os.environ.get("PATH").split(":")
|
|
for path in paths:
|
|
bin_path = os.path.join(path, name)
|
|
if os.access(bin_path, os.X_OK):
|
|
return bin_path
|
|
|
|
# maybe it wasn't in PATH
|
|
bin_path = os.path.join("/usr/local/bin", name)
|
|
if os.access(bin_path, os.X_OK):
|
|
return bin_path
|
|
bin_path = os.path.join("/usr/bin", name)
|
|
if os.access(bin_path, os.X_OK):
|
|
return bin_path
|
|
|
|
raise TestException(f"{name} was not found in PATH")
|
|
|
|
|
|
def find_clickhouse_command(binary, command):
|
|
symlink = binary + "-" + command
|
|
if os.access(symlink, os.X_OK):
|
|
return symlink
|
|
|
|
# To avoid requiring symlinks (in case you download binary from CI)
|
|
return binary + " " + command
|
|
|
|
|
|
def get_additional_client_options(args):
|
|
if args.client_option:
|
|
client_options = " ".join("--" + option for option in args.client_option)
|
|
if "CLICKHOUSE_CLIENT_OPT" in os.environ:
|
|
return os.environ["CLICKHOUSE_CLIENT_OPT"] + " " + client_options
|
|
else:
|
|
return client_options
|
|
else:
|
|
if "CLICKHOUSE_CLIENT_OPT" in os.environ:
|
|
return os.environ["CLICKHOUSE_CLIENT_OPT"]
|
|
return ""
|
|
|
|
|
|
def get_additional_client_options_url(args):
|
|
if args.client_option:
|
|
return "&".join(args.client_option)
|
|
return ""
|
|
|
|
|
|
def parse_args():
|
|
parser = ArgumentParser(description="ClickHouse functional tests")
|
|
parser.add_argument("-q", "--queries", help="Path to queries dir")
|
|
parser.add_argument("--tmp", help="Path to tmp dir")
|
|
|
|
parser.add_argument(
|
|
"-b",
|
|
"--binary",
|
|
default="clickhouse",
|
|
type=find_binary,
|
|
help="Path to clickhouse binary or name of binary in PATH",
|
|
)
|
|
parser.add_argument(
|
|
"-c",
|
|
"--client",
|
|
help="Path to clickhouse-client, this option is useless"
|
|
"name of binary in PATH",
|
|
)
|
|
|
|
parser.add_argument("--extract_from_config", help="extract-from-config program")
|
|
parser.add_argument(
|
|
"--configclient", help="Client config (if you do not use default ports)"
|
|
)
|
|
parser.add_argument(
|
|
"--configserver",
|
|
default="/etc/clickhouse-server/config.xml",
|
|
help="Preprocessed server config",
|
|
)
|
|
parser.add_argument(
|
|
"-o", "--output", help="Output xUnit compliant test report directory"
|
|
)
|
|
parser.add_argument(
|
|
"-t",
|
|
"--timeout",
|
|
type=int,
|
|
default=600,
|
|
help="Timeout for each test case in seconds",
|
|
)
|
|
parser.add_argument(
|
|
"--global_time_limit",
|
|
type=int,
|
|
help="Stop if executing more than specified time (after current test is finished)",
|
|
)
|
|
parser.add_argument("test", nargs="*", help="Optional test case name regex")
|
|
parser.add_argument(
|
|
"-d",
|
|
"--disabled",
|
|
action="store_true",
|
|
default=False,
|
|
help="Also run disabled tests",
|
|
)
|
|
parser.add_argument(
|
|
"--stop",
|
|
action="store_true",
|
|
default=None,
|
|
dest="stop",
|
|
help="Stop on network errors",
|
|
)
|
|
parser.add_argument(
|
|
"--order", default="desc", choices=["asc", "desc", "random"], help="Run order"
|
|
)
|
|
parser.add_argument(
|
|
"--testname",
|
|
action="store_true",
|
|
default=None,
|
|
dest="testname",
|
|
help="Make query with test name before test run",
|
|
)
|
|
parser.add_argument("--hung-check", action="store_true", default=False)
|
|
parser.add_argument("--no-left-queries-check", action="store_true", default=False)
|
|
parser.add_argument("--force-color", action="store_true", default=False)
|
|
parser.add_argument(
|
|
"--database", help="Database for tests (random name test_XXXXXX by default)"
|
|
)
|
|
parser.add_argument(
|
|
"--no-drop-if-fail",
|
|
action="store_true",
|
|
help="Do not drop database for test if test has failed",
|
|
)
|
|
parser.add_argument(
|
|
"--hide-db-name",
|
|
action="store_true",
|
|
help='Replace random database name with "default" in stderr',
|
|
)
|
|
parser.add_argument(
|
|
"--parallel", default="1/1", help="One parallel test run number/total"
|
|
)
|
|
parser.add_argument(
|
|
"-j", "--jobs", default=1, nargs="?", type=int, help="Run all tests in parallel"
|
|
)
|
|
parser.add_argument(
|
|
"--test-runs",
|
|
default=1,
|
|
nargs="?",
|
|
type=int,
|
|
help="Run each test many times (useful for e.g. flaky check)",
|
|
)
|
|
parser.add_argument(
|
|
"-U",
|
|
"--unified",
|
|
default=3,
|
|
type=int,
|
|
help="output NUM lines of unified context",
|
|
)
|
|
parser.add_argument(
|
|
"-r",
|
|
"--server-check-retries",
|
|
default=180,
|
|
type=int,
|
|
help="Num of tries to execute SELECT 1 before tests started",
|
|
)
|
|
parser.add_argument("--db-engine", help="Database engine name")
|
|
parser.add_argument(
|
|
"--replicated-database",
|
|
action="store_true",
|
|
default=False,
|
|
help="Run tests with Replicated database engine",
|
|
)
|
|
parser.add_argument(
|
|
"--fast-tests-only",
|
|
action="store_true",
|
|
default=False,
|
|
help='Run only fast tests (the tests without the "no-fasttest" tag)',
|
|
)
|
|
parser.add_argument(
|
|
"--no-stateless", action="store_true", help="Disable all stateless tests"
|
|
)
|
|
parser.add_argument(
|
|
"--no-stateful", action="store_true", help="Disable all stateful tests"
|
|
)
|
|
parser.add_argument("--skip", nargs="+", help="Skip these tests")
|
|
parser.add_argument(
|
|
"--sequential",
|
|
nargs="+",
|
|
help="Run these tests sequentially even if --parallel specified",
|
|
)
|
|
parser.add_argument(
|
|
"--no-long", action="store_true", dest="no_long", help="Do not run long tests"
|
|
)
|
|
parser.add_argument(
|
|
"--client-option", nargs="+", help="Specify additional client argument"
|
|
)
|
|
parser.add_argument(
|
|
"--print-time", action="store_true", dest="print_time", help="Print test time"
|
|
)
|
|
parser.add_argument(
|
|
"--check-zookeeper-session",
|
|
action="store_true",
|
|
help="Check ZooKeeper session uptime to determine if failed test should be retried",
|
|
)
|
|
parser.add_argument(
|
|
"--s3-storage",
|
|
action="store_true",
|
|
default=False,
|
|
help="Run tests over s3 storage",
|
|
)
|
|
parser.add_argument(
|
|
"--distributed-cache",
|
|
action="store_true",
|
|
default=False,
|
|
help="Run tests with enabled distributed cache",
|
|
)
|
|
parser.add_argument(
|
|
"--azure-blob-storage",
|
|
action="store_true",
|
|
default=False,
|
|
help="Run tests over azure blob storage",
|
|
)
|
|
parser.add_argument(
|
|
"--no-random-settings",
|
|
action="store_true",
|
|
default=False,
|
|
help="Disable settings randomization",
|
|
)
|
|
parser.add_argument(
|
|
"--no-random-merge-tree-settings",
|
|
action="store_true",
|
|
default=False,
|
|
help="Disable MergeTree settings randomization",
|
|
)
|
|
parser.add_argument(
|
|
"--run-by-hash-num",
|
|
type=int,
|
|
help="Run tests matching crc32(test_name) % run_by_hash_total == run_by_hash_num",
|
|
)
|
|
parser.add_argument(
|
|
"--run-by-hash-total",
|
|
type=int,
|
|
help="Total test groups for crc32(test_name) % run_by_hash_total == run_by_hash_num",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--show-whitespaces-in-diff",
|
|
action="store_true",
|
|
help="Display $ characters after line with trailing whitespaces in diff output",
|
|
)
|
|
|
|
group = parser.add_mutually_exclusive_group(required=False)
|
|
group.add_argument(
|
|
"--cloud",
|
|
action="store_true",
|
|
default=None,
|
|
dest="cloud",
|
|
help="Run only tests that are supported in ClickHouse Cloud environment",
|
|
)
|
|
|
|
group.add_argument(
|
|
"--no-cloud",
|
|
action="store_false",
|
|
default=None,
|
|
dest="cloud",
|
|
help="Run all the tests, including the ones not supported in ClickHouse Cloud environment",
|
|
)
|
|
parser.set_defaults(cloud=False)
|
|
|
|
group = parser.add_mutually_exclusive_group(required=False)
|
|
group.add_argument(
|
|
"--private",
|
|
action="store_true",
|
|
default=None,
|
|
dest="private",
|
|
help="Run only tests that are supported in the private build",
|
|
)
|
|
|
|
group.add_argument(
|
|
"--no-private",
|
|
action="store_false",
|
|
default=None,
|
|
dest="private",
|
|
help="Run all the tests, including the ones not supported in the private build",
|
|
)
|
|
# Only used to skip tests via "../queries-no-private-tests.txt", so it's fine to keep it enabled by default
|
|
parser.set_defaults(private=True)
|
|
|
|
group = parser.add_mutually_exclusive_group(required=False)
|
|
group.add_argument(
|
|
"--zookeeper",
|
|
action="store_true",
|
|
default=None,
|
|
dest="zookeeper",
|
|
help="Run zookeeper related tests",
|
|
)
|
|
group.add_argument(
|
|
"--no-zookeeper",
|
|
action="store_false",
|
|
default=None,
|
|
dest="zookeeper",
|
|
help="Do not run zookeeper related tests",
|
|
)
|
|
|
|
group = parser.add_mutually_exclusive_group(required=False)
|
|
group.add_argument(
|
|
"--shard",
|
|
action="store_true",
|
|
default=None,
|
|
dest="shard",
|
|
help="Run sharding related tests "
|
|
"(required to clickhouse-server listen 127.0.0.2 127.0.0.3)",
|
|
)
|
|
group.add_argument(
|
|
"--no-shard",
|
|
action="store_false",
|
|
default=None,
|
|
dest="shard",
|
|
help="Do not run shard related tests",
|
|
)
|
|
|
|
# TODO: Remove upgrade-check option after release 24.3 and use
|
|
# ignore_drop_queries_probability option in stress.py as in stress tests
|
|
group.add_argument(
|
|
"--upgrade-check",
|
|
action="store_true",
|
|
help="Run tests for further server upgrade testing by ignoring all"
|
|
"drop queries in tests for collecting data from new version of server",
|
|
)
|
|
parser.add_argument(
|
|
"--secure",
|
|
action="store_true",
|
|
default=False,
|
|
help="Use secure connection to connect to clickhouse-server",
|
|
)
|
|
parser.add_argument(
|
|
"--max-failures-chain",
|
|
default=20,
|
|
type=int,
|
|
help="Max number of failed tests in a row (stop tests if higher)",
|
|
)
|
|
parser.add_argument(
|
|
"--report-coverage",
|
|
action="store_true",
|
|
default=False,
|
|
help="Check what high-level server components were covered by tests",
|
|
)
|
|
parser.add_argument(
|
|
"--collect-per-test-coverage",
|
|
action="store_true",
|
|
default=True,
|
|
help="Create `system.coverage_log` table on the server and collect information about low-level code coverage on a per test basis there",
|
|
)
|
|
parser.add_argument(
|
|
"--reset-coverage-before-every-test",
|
|
action="store_true",
|
|
default=True,
|
|
help="Collect isolated test coverage for every test instead of a cumulative. Useful only when tests are run sequentially.",
|
|
)
|
|
parser.add_argument(
|
|
"--report-logs-stats",
|
|
action="store_true",
|
|
default=False,
|
|
help="Report statistics about log messages",
|
|
)
|
|
parser.add_argument(
|
|
"--no-parallel-replicas",
|
|
action="store_true",
|
|
default=False,
|
|
help="Do not include tests that are not supported with parallel replicas feature",
|
|
)
|
|
parser.add_argument(
|
|
"--replace-replicated-with-shared",
|
|
action="store_true",
|
|
default=os.environ.get("REPLACE_RMT_WITH_SMT", False),
|
|
help="Replace ReplicatedMergeTree engine with SharedMergeTree",
|
|
)
|
|
parser.add_argument(
|
|
"--replace-non-replicated-with-shared",
|
|
action="store_true",
|
|
default=os.environ.get("REPLACE_MT_WITH_SMT", False),
|
|
help="Replace ordinary MergeTree engine with SharedMergeTree",
|
|
)
|
|
parser.add_argument(
|
|
"--client-log",
|
|
default="./client.fatal.log",
|
|
help="Path to file for fatal logs from client",
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
class Terminated(KeyboardInterrupt):
|
|
pass
|
|
|
|
|
|
def signal_handler(sig, frame):
|
|
raise Terminated(f"Terminated with {sig} signal")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
stop_time = None
|
|
exit_code = multiprocessing.Value("i", 0)
|
|
server_died = multiprocessing.Event()
|
|
multiprocessing_manager = multiprocessing.Manager()
|
|
restarted_tests = multiprocessing_manager.list()
|
|
|
|
# Move to a new process group and kill it at exit so that we don't have any
|
|
# infinite tests processes left
|
|
# (new process group is required to avoid killing some parent processes)
|
|
os.setpgid(0, 0)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGHUP, signal_handler)
|
|
|
|
try:
|
|
args = parse_args()
|
|
except Exception as e:
|
|
print(e, file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if args.queries and not os.path.isdir(args.queries):
|
|
print(
|
|
f"Cannot access the specified directory with queries ({args.queries})",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
# Autodetect the directory with queries if not specified
|
|
if args.queries is None:
|
|
args.queries = "queries"
|
|
|
|
if not os.path.isdir(args.queries):
|
|
# If we're running from the repo
|
|
args.queries = os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)), "queries"
|
|
)
|
|
|
|
if not os.path.isdir(args.queries):
|
|
# Next we're going to try some system directories, don't write 'stdout' files into them.
|
|
if args.tmp is None:
|
|
args.tmp = "/tmp/clickhouse-test"
|
|
|
|
args.queries = "/usr/local/share/clickhouse-test/queries"
|
|
|
|
if not os.path.isdir(args.queries):
|
|
args.queries = "/usr/share/clickhouse-test/queries"
|
|
|
|
if not os.path.isdir(args.queries):
|
|
print(
|
|
"Failed to detect path to the queries directory. Please specify it with "
|
|
"'--queries' option.",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
print("Using queries from '" + args.queries + "' directory")
|
|
|
|
if args.tmp is None:
|
|
args.tmp = args.queries
|
|
|
|
if args.client:
|
|
print(
|
|
"WARNING: --client option is deprecated and will be removed the the future, use --binary instead",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
args.client = find_clickhouse_command(args.binary, "client")
|
|
|
|
if args.extract_from_config:
|
|
print(
|
|
"WARNING: --extract_from_config option is deprecated and will be removed the the future",
|
|
file=sys.stderr,
|
|
)
|
|
args.extract_from_config = find_clickhouse_command(
|
|
args.binary, "extract-from-config"
|
|
)
|
|
|
|
if args.configclient:
|
|
args.client += " --config-file=" + args.configclient
|
|
|
|
tcp_host = os.getenv("CLICKHOUSE_HOST")
|
|
if tcp_host is not None:
|
|
args.tcp_host = tcp_host
|
|
args.client += f" --host={tcp_host}"
|
|
else:
|
|
args.tcp_host = "localhost"
|
|
|
|
tcp_port = os.getenv("CLICKHOUSE_PORT_TCP")
|
|
if tcp_port is not None:
|
|
args.tcp_port = int(tcp_port)
|
|
args.client += f" --port={tcp_port}"
|
|
else:
|
|
args.tcp_port = 9440 if args.secure else 9000
|
|
if args.secure:
|
|
os.environ["CLICKHOUSE_PORT_TCP"] = str(args.tcp_port)
|
|
|
|
http_port = os.getenv("CLICKHOUSE_PORT_HTTP")
|
|
if http_port is not None:
|
|
args.http_port = int(http_port)
|
|
else:
|
|
args.http_port = 8443 if args.secure else 8123
|
|
os.environ["CLICKHOUSE_PORT_HTTP"] = str(args.http_port)
|
|
if args.secure and os.getenv("CLICKHOUSE_PORT_HTTP_PROTO") is None:
|
|
os.environ["CLICKHOUSE_PORT_HTTP_PROTO"] = "https"
|
|
|
|
client_database = os.getenv("CLICKHOUSE_DATABASE")
|
|
if client_database is not None:
|
|
args.client += f" --database={client_database}"
|
|
args.client_database = client_database
|
|
else:
|
|
args.client_database = "default"
|
|
|
|
if args.upgrade_check:
|
|
args.client += " --fake-drop"
|
|
|
|
if args.client_option or args.secure:
|
|
# Set options for client
|
|
if "CLICKHOUSE_CLIENT_OPT" in os.environ:
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"] += " "
|
|
else:
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"] = ""
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"] += get_additional_client_options(args)
|
|
|
|
if args.secure:
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"] += " --secure "
|
|
|
|
# Set options for curl
|
|
if "CLICKHOUSE_URL_PARAMS" in os.environ:
|
|
os.environ["CLICKHOUSE_URL_PARAMS"] += "&"
|
|
else:
|
|
os.environ["CLICKHOUSE_URL_PARAMS"] = ""
|
|
|
|
client_options_query_str = get_additional_client_options_url(args)
|
|
args.client_options_query_str = client_options_query_str + "&"
|
|
args.client_options_query_str += os.environ["CLICKHOUSE_URL_PARAMS"]
|
|
os.environ["CLICKHOUSE_URL_PARAMS"] += client_options_query_str
|
|
else:
|
|
args.client_options_query_str = ""
|
|
|
|
if args.jobs is None:
|
|
args.jobs = multiprocessing.cpu_count()
|
|
|
|
if args.db_engine and args.db_engine == "Ordinary":
|
|
MESSAGES_TO_RETRY.append(" locking attempt on ")
|
|
|
|
if args.replace_replicated_with_shared:
|
|
args.s3_storage = True
|
|
|
|
main(args)
|