ClickHouse/tests/clickhouse-test

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

3537 lines
122 KiB
Plaintext
Raw Normal View History

2020-10-02 16:54:07 +00:00
#!/usr/bin/env python3
# pylint: disable=too-many-return-statements
# pylint: disable=global-variable-not-assigned
2022-04-28 11:26:49 +00:00
# pylint: disable=too-many-lines
2023-01-17 09:17:51 +00:00
# pylint: disable=anomalous-backslash-in-string
# pylint: disable=protected-access
2024-02-26 17:46:15 +00:00
import copy
import enum
import tempfile
2024-02-26 17:46:15 +00:00
import glob
# Not requests, to avoid requiring extra dependency.
import http.client
import itertools
2024-02-26 17:46:15 +00:00
import json
import math
import multiprocessing
import os
import os.path
import platform
2024-02-26 17:46:15 +00:00
import random
import re
2024-02-26 17:46:15 +00:00
import shutil
import signal
import socket
import string
import subprocess
import sys
import traceback
import urllib.parse
2022-04-27 11:02:45 +00:00
# for crc32
import zlib
from argparse import ArgumentParser
2023-12-18 18:16:50 +00:00
from datetime import datetime, timedelta
2016-09-02 16:26:09 +00:00
from errno import ESRCH
2024-02-26 17:46:15 +00:00
from subprocess import PIPE, Popen
from time import sleep, time
from typing import Dict, List, Optional, Set, Tuple, Union
try:
2022-04-28 11:26:49 +00:00
import termcolor # type: ignore
except ImportError:
termcolor = None
USE_JINJA = True
try:
import jinja2
except ImportError:
USE_JINJA = False
2022-04-27 11:02:45 +00:00
print("WARNING: jinja2 not installed! Template tests will be skipped.")
2019-03-13 16:47:02 +00:00
MESSAGES_TO_RETRY = [
2020-05-29 00:46:42 +00:00
"ConnectionPoolWithFailover: Connection failed at try",
2021-06-15 20:52:29 +00:00
"DB::Exception: New table appeared in database being dropped or detached. Try again",
2021-07-06 13:36:18 +00:00
"is already started to be removing by another replica right now",
# This is from LSan, and it indicates its own internal problem:
"Unable to get registers from thread",
]
2019-03-13 16:47:02 +00:00
2021-06-21 17:29:32 +00:00
MAX_RETRIES = 3
2021-06-15 20:52:29 +00:00
2022-04-27 11:02:45 +00:00
TEST_FILE_EXTENSIONS = [".sql", ".sql.j2", ".sh", ".py", ".expect"]
2022-02-18 13:54:21 +00:00
VERSION_PATTERN = r"^((\d+\.)?(\d+\.)?(\d+\.)?\d+)$"
TEST_MAX_RUN_TIME_IN_SECONDS = 180
class SharedEngineReplacer:
ENGINES_NON_REPLICATED_REGEXP = r"[ =]((Collapsing|VersionedCollapsing|Summing|Replacing|Aggregating|)MergeTree\(?\)?)"
ENGINES_MAPPING_REPLICATED = [
("ReplicatedMergeTree", "SharedMergeTree"),
("ReplicatedCollapsingMergeTree", "SharedCollapsingMergeTree"),
(
"ReplicatedVersionedCollapsingMergeTree",
"SharedVersionedCollapsingMergeTree",
),
("ReplicatedSummingMergeTree", "SharedSummingMergeTree"),
("ReplicatedReplacingMergeTree", "SharedReplacingMergeTree"),
("ReplicatedAggregatingMergeTree", "SharedAggregatingMergeTree"),
]
NEW_SYNTAX_REPLICATED_MERGE_TREE_RE = (
r"Replicated[a-zA-Z]*MergeTree\((\\?'.*\\?')?,?(\\?'.*\\?')?[a-zA-Z, _}{]*\)"
)
OLD_SYNTAX_OR_ARGUMENTS_RE = r"Tree\(.*[0-9]+.*\)"
def _check_replicad_new_syntax(self, line):
return re.search(self.NEW_SYNTAX_REPLICATED_MERGE_TREE_RE, line) is not None
def _check_old_syntax_or_arguments(self, line):
return re.search(self.OLD_SYNTAX_OR_ARGUMENTS_RE, line) is not None
@staticmethod
def _is_comment_line(line):
return line.startswith("SELECT") or line.startswith("select")
@staticmethod
def _is_create_query(line):
return (
line.startswith("CREATE")
or line.startswith("create")
or line.startswith("ENGINE")
or line.startswith("engine")
)
def _replace_non_replicated(self, line, escape_quotes, use_random_path):
groups = re.search(self.ENGINES_NON_REPLICATED_REGEXP, line)
if groups is not None and not self._check_old_syntax_or_arguments(line):
non_replicated_engine = groups.groups()[0]
basename_no_ext = os.path.splitext(os.path.basename(self.file_name))[0]
if use_random_path:
shared_path = "/" + os.path.join(
basename_no_ext.replace("_", "/"),
str(os.getpid()),
str(random.randint(1, 1000)),
)
else:
shared_path = "/" + os.path.join(
basename_no_ext.replace("_", "/"), str(os.getpid())
)
if escape_quotes:
shared_engine = (
"Shared"
+ non_replicated_engine.replace("()", "")
+ f"(\\'{shared_path}\\', \\'1\\')"
)
else:
shared_engine = (
"Shared"
+ non_replicated_engine.replace("()", "")
+ f"('{shared_path}', '1')"
)
return line.replace(non_replicated_engine, shared_engine)
return line
def _need_to_replace_something(self):
return (
self.replace_replicated or self.replace_non_replicated
) and "shared_merge_tree" not in self.file_name
def _has_show_create_table(self):
with open(self.file_name, "r", encoding="utf-8") as f:
return re.search("show create table", f.read(), re.IGNORECASE)
def __init__(
self, file_name, replace_replicated, replace_non_replicated, reference_file
):
self.file_name = file_name
self.temp_file_path = get_temp_file_path()
self.replace_replicated = replace_replicated
self.replace_non_replicated = replace_non_replicated
use_random_path = not reference_file and not self._has_show_create_table()
if not self._need_to_replace_something():
return
shutil.copyfile(self.file_name, self.temp_file_path)
shutil.copymode(self.file_name, self.temp_file_path)
with open(self.file_name, "w", newline="", encoding="utf-8") as modified:
with open(self.temp_file_path, "r", newline="", encoding="utf-8") as source:
for line in source:
if self._is_comment_line(line) or (
reference_file and not self._is_create_query(line)
):
modified.write(line)
continue
if self.replace_replicated:
for (
engine_from,
engine_to,
) in SharedEngineReplacer.ENGINES_MAPPING_REPLICATED:
if engine_from in line and (
self._check_replicad_new_syntax(line)
or engine_from + " " in line
or engine_from + ";" in line
):
line = line.replace(engine_from, engine_to)
break
if self.replace_non_replicated:
line = self._replace_non_replicated(
line, reference_file, use_random_path
)
modified.write(line)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_tb):
if not self._need_to_replace_something():
return
shutil.move(self.temp_file_path, self.file_name)
def get_temp_file_path():
return os.path.join(
tempfile._get_default_tempdir(), next(tempfile._get_candidate_names())
)
2024-02-26 17:46:15 +00:00
def stringhash(s: str) -> int:
# default hash() function consistent
# only during process invocation https://stackoverflow.com/a/42089311
2022-04-27 11:02:45 +00:00
return zlib.crc32(s.encode("utf-8"))
2024-01-15 05:26:04 +00:00
2024-01-15 04:40:03 +00:00
def read_file_as_binary_string(file_path):
2024-01-15 05:26:04 +00:00
with open(file_path, "rb") as file:
2024-01-15 04:40:03 +00:00
binary_data = file.read()
return binary_data
2024-01-15 05:26:04 +00:00
2023-02-06 19:27:46 +00:00
# First and last lines of the log
def trim_for_log(s):
if not s:
return s
lines = s.splitlines()
2023-02-27 23:07:50 +00:00
if len(lines) > 10000:
separator = "-" * 40 + str(len(lines) - 10000) + " lines are hidden" + "-" * 40
2023-02-27 23:07:50 +00:00
return "\n".join(lines[:5000] + [] + [separator] + [] + lines[-5000:])
else:
return "\n".join(lines)
def is_valid_utf_8(fname):
try:
with open(fname, "rb") as f:
contents = f.read()
contents.decode("utf-8")
return True
except UnicodeDecodeError:
return False
2024-02-26 17:46:15 +00:00
class TestException(Exception):
pass
class HTTPError(Exception):
def __init__(self, message=None, code=None):
self.message = message
self.code = code
super().__init__(message)
def __str__(self):
2022-04-28 11:26:49 +00:00
return f"Code: {self.code}. {self.message}"
2022-04-27 11:02:45 +00:00
# Helpers to execute queries via HTTP interface.
2022-04-27 11:02:45 +00:00
def clickhouse_execute_http(
base_args,
query,
2024-01-15 04:40:03 +00:00
body=None,
timeout=30,
settings=None,
default_format=None,
max_http_retries=5,
retry_error_codes=False,
2022-04-27 11:02:45 +00:00
):
if args.secure:
client = http.client.HTTPSConnection(
host=base_args.tcp_host, port=base_args.http_port, timeout=timeout
)
else:
client = http.client.HTTPConnection(
host=base_args.tcp_host, port=base_args.http_port, timeout=timeout
)
timeout = int(timeout)
params = {
2022-04-27 11:02:45 +00:00
"query": query,
# hung check in stress tests may remove the database,
# hence we should use 'system'.
2022-04-27 11:02:45 +00:00
"database": "system",
"connect_timeout": timeout,
"receive_timeout": timeout,
"send_timeout": timeout,
"http_connection_timeout": timeout,
"http_receive_timeout": timeout,
"http_send_timeout": timeout,
"output_format_parallel_formatting": 0,
}
if settings is not None:
params.update(settings)
if default_format is not None:
2022-04-27 11:02:45 +00:00
params["default_format"] = default_format
for i in range(max_http_retries):
try:
client.request(
"POST",
f"/?{base_args.client_options_query_str}{urllib.parse.urlencode(params)}",
2024-01-15 05:26:04 +00:00
body=body,
)
res = client.getresponse()
data = res.read()
if res.status == 200 or (not retry_error_codes):
break
2022-12-28 22:18:58 +00:00
except Exception as ex:
if i == max_http_retries - 1:
raise ex
2023-04-03 12:15:18 +00:00
client.close()
2022-12-28 22:18:58 +00:00
sleep(i + 1)
if res.status != 200:
raise HTTPError(data.decode(), res.status)
return data
2022-04-27 11:02:45 +00:00
def clickhouse_execute(
base_args,
query,
2024-01-15 04:40:03 +00:00
body=None,
timeout=30,
settings=None,
max_http_retries=5,
retry_error_codes=False,
):
return clickhouse_execute_http(
base_args,
query,
2024-01-15 04:40:03 +00:00
body,
timeout,
settings,
max_http_retries=max_http_retries,
retry_error_codes=retry_error_codes,
).strip()
2022-04-27 11:02:45 +00:00
def clickhouse_execute_json(
base_args, query, timeout=60, settings=None, max_http_retries=5
):
data = clickhouse_execute_http(
base_args,
query,
2024-01-15 04:40:03 +00:00
None,
timeout,
settings,
"JSONEachRow",
max_http_retries=max_http_retries,
)
if not data:
return None
rows = []
for row in data.strip().splitlines():
rows.append(json.loads(row))
return rows
2024-07-03 15:53:05 +00:00
def stop_tests():
# send signal to all processes in group to avoid hung check triggering
# (to avoid terminating clickhouse-test itself, the signal should be ignored)
2024-07-10 10:47:32 +00:00
print("Sending signals")
2024-07-03 15:53:05 +00:00
signal.signal(signal.SIGTERM, signal.SIG_IGN)
os.killpg(os.getpgid(os.getpid()), signal.SIGTERM)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
2024-07-10 10:47:32 +00:00
print("Sending signals DONE")
2024-07-03 15:53:05 +00:00
2021-02-15 10:26:34 +00:00
def get_db_engine(args, database_name):
if args.replicated_database:
return f" ON CLUSTER test_cluster_database_replicated \
ENGINE=Replicated('/test/clickhouse/db/{database_name}', \
'{{shard}}', '{{replica}}')"
2020-09-21 10:24:10 +00:00
if args.db_engine:
return " ENGINE=" + args.db_engine
2022-04-27 11:02:45 +00:00
return "" # Will use default engine
2022-08-12 09:28:16 +00:00
def get_create_database_settings(args, testcase_args):
2024-02-26 17:46:15 +00:00
create_database_settings = {}
2022-08-12 09:28:16 +00:00
if testcase_args:
create_database_settings["log_comment"] = testcase_args.testcase_basename
if args.db_engine == "Ordinary":
create_database_settings["allow_deprecated_database_ordinary"] = 1
return create_database_settings
def get_zookeeper_session_uptime(args):
try:
if args.replicated_database:
2022-04-27 11:02:45 +00:00
return int(
clickhouse_execute(
args,
"""
SELECT min(materialize(zookeeperSessionUptime()))
FROM clusterAllReplicas('test_cluster_database_replicated', system.one)
2022-04-27 11:02:45 +00:00
""",
)
)
else:
2022-04-27 11:02:45 +00:00
return int(clickhouse_execute(args, "SELECT zookeeperSessionUptime()"))
2022-04-27 11:17:54 +00:00
except Exception:
return None
def need_retry(args, stdout, stderr, total_time):
if args.check_zookeeper_session:
# Sometimes we may get unexpected exception like "Replica is readonly" or "Shutdown is called for table"
# instead of "Session expired" or "Connection loss"
# Retry if session was expired during test execution.
# If ZooKeeper is configured, then it's more reliable than checking stderr,
# but the following condition is always true if ZooKeeper is not configured.
session_uptime = get_zookeeper_session_uptime(args)
if session_uptime is not None and session_uptime < math.ceil(total_time):
return True
2022-04-27 11:02:45 +00:00
return any(msg in stdout for msg in MESSAGES_TO_RETRY) or any(
msg in stderr for msg in MESSAGES_TO_RETRY
)
2019-03-13 16:47:02 +00:00
2023-12-18 18:16:50 +00:00
def get_processlist_size(args):
if args.replicated_database:
return int(
clickhouse_execute(
2023-01-13 13:29:08 +00:00
args,
"""
SELECT
2023-12-18 18:16:50 +00:00
count()
2024-03-28 00:58:17 +00:00
FROM clusterAllReplicas('test_cluster_database_replicated', system.processes)
WHERE query NOT LIKE '%system.processes%'
2023-12-18 18:16:50 +00:00
""",
2023-12-18 19:50:58 +00:00
).strip()
2023-12-18 18:16:50 +00:00
)
else:
return int(
2023-12-18 19:50:58 +00:00
clickhouse_execute(
args,
"""
2023-12-18 19:50:58 +00:00
SELECT
count()
FROM system.processes
WHERE query NOT LIKE '%system.processes%'
""",
2023-12-18 18:16:50 +00:00
).strip()
)
2023-12-18 19:50:58 +00:00
2023-12-18 18:16:50 +00:00
def get_processlist_with_stacktraces(args):
if args.replicated_database:
return clickhouse_execute(
args,
"""
SELECT materialize(hostName() || '::' || tcpPort()::String) as host_port, *
-- NOTE: view() here to do JOIN on shards, instead of initiator
FROM clusterAllReplicas('test_cluster_database_replicated', view(
SELECT
2023-05-15 16:28:12 +00:00
p.*,
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace
2023-05-15 16:28:12 +00:00
)) AS stacktraces
FROM system.processes p
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
2023-12-18 18:16:50 +00:00
))
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
timeout=120,
)
else:
return clickhouse_execute(
args,
"""
SELECT
p.*,
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace
)) AS stacktraces
FROM system.processes p
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
timeout=120,
)
2022-04-27 11:02:45 +00:00
2022-03-14 20:43:34 +00:00
def get_transactions_list(args):
try:
if args.replicated_database:
2022-04-27 11:02:45 +00:00
return clickhouse_execute_json(
args,
2022-04-28 11:26:49 +00:00
"SELECT materialize((hostName(), tcpPort())) as host, * FROM "
"clusterAllReplicas('test_cluster_database_replicated', system.transactions)",
2022-04-27 11:02:45 +00:00
)
2022-03-14 20:43:34 +00:00
else:
2022-04-27 11:02:45 +00:00
return clickhouse_execute_json(args, "select * from system.transactions")
2022-03-14 20:43:34 +00:00
except Exception as e:
return f"Cannot get list of transactions: {e}"
2022-04-27 11:02:45 +00:00
2023-06-08 11:14:43 +00:00
def kill_gdb_if_any():
# Check if we have running gdb.
code = subprocess.call("pidof gdb", shell=True)
if code != 0:
return
for i in range(5):
2023-06-08 11:14:43 +00:00
code = subprocess.call("kill -TERM $(pidof gdb)", shell=True, timeout=30)
if code != 0:
2023-06-12 11:45:00 +00:00
sleep(i)
else:
break
2023-06-12 11:45:00 +00:00
2020-03-23 18:17:07 +00:00
# collect server stacktraces using gdb
def get_stacktraces_from_gdb(server_pid):
try:
# We could attach gdb to clickhouse-server before running some tests
# to print stacktraces of all crashes even if clickhouse cannot print it for some reason.
# We should kill existing gdb if any before starting new one.
2023-06-08 11:14:43 +00:00
kill_gdb_if_any()
cmd = f"gdb -batch -ex 'thread apply all backtrace' -p {server_pid}"
2022-04-27 11:02:45 +00:00
return subprocess.check_output(cmd, shell=True).decode("utf-8")
except Exception as e:
print(f"Error occurred while receiving stack traces from gdb: {e}")
2021-02-19 14:38:20 +00:00
return None
2020-03-23 18:17:07 +00:00
# collect server stacktraces from system.stack_trace table
def get_stacktraces_from_clickhouse(args):
2022-04-27 11:02:45 +00:00
settings_str = " ".join(
[
get_additional_client_options(args),
"--allow_introspection_functions=1",
"--skip_unavailable_shards=1",
]
)
2022-04-27 11:17:54 +00:00
replicated_msg = (
f"{args.client} {settings_str} --query "
2023-08-16 22:42:51 +00:00
'"SELECT materialize((hostName(), tcpPort())) as host, thread_name, thread_id, query_id, trace, '
2022-04-27 11:17:54 +00:00
"arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), "
"arrayMap(x -> addressToLine(x), trace), "
2023-08-16 22:42:51 +00:00
"arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace_str "
2022-04-27 11:17:54 +00:00
"FROM clusterAllReplicas('test_cluster_database_replicated', 'system.stack_trace') "
'ORDER BY host, thread_id FORMAT Vertical"'
2022-04-27 11:02:45 +00:00
)
2022-04-27 11:17:54 +00:00
msg = (
f"{args.client} {settings_str} --query "
2023-08-16 22:42:51 +00:00
"\"SELECT thread_name, thread_id, query_id, trace, arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), "
2022-04-27 11:17:54 +00:00
"arrayMap(x -> addressToLine(x), trace), "
2023-08-16 22:42:51 +00:00
"arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace_str "
2022-04-27 11:17:54 +00:00
'FROM system.stack_trace FORMAT Vertical"'
2022-04-27 11:02:45 +00:00
)
try:
return subprocess.check_output(
replicated_msg if args.replicated_database else msg,
2022-04-27 11:02:45 +00:00
shell=True,
stderr=subprocess.STDOUT,
).decode("utf-8")
except Exception as e:
print(f"Error occurred while receiving stack traces from client: {e}")
2021-02-19 14:38:20 +00:00
return None
def print_stacktraces() -> None:
server_pid = get_server_pid()
bt = None
if server_pid and not args.replicated_database:
print("")
2022-04-27 11:02:45 +00:00
print(
f"Located ClickHouse server process {server_pid} listening at TCP port {args.tcp_port}"
)
print("Collecting stacktraces from all running threads with gdb:")
bt = get_stacktraces_from_gdb(server_pid)
if len(bt) < 1000:
print("Got suspiciously small stacktraces: ", bt)
bt = None
if bt is None:
print("\nCollecting stacktraces from system.stacktraces table:")
bt = get_stacktraces_from_clickhouse(args)
if bt is not None:
print(bt)
return
2022-04-27 11:02:45 +00:00
print(
colored(
2022-04-27 11:17:54 +00:00
f"\nUnable to locate ClickHouse server process listening at TCP port "
f"{args.tcp_port}. It must have crashed or exited prematurely!",
2022-04-27 11:02:45 +00:00
args,
"red",
attrs=["bold"],
)
)
def get_server_pid():
2021-02-19 14:38:20 +00:00
# lsof does not work in stress tests for some reason
2022-04-27 11:17:54 +00:00
cmd_lsof = f"lsof -i tcp:{args.tcp_port} -s tcp:LISTEN -Fp | sed 's/^p//p;d'"
2021-02-19 14:38:20 +00:00
cmd_pidof = "pidof -s clickhouse-server"
2021-02-19 14:38:20 +00:00
commands = [cmd_lsof, cmd_pidof]
2021-02-19 09:57:09 +00:00
output = None
2021-02-19 14:38:20 +00:00
for cmd in commands:
try:
2022-04-27 11:02:45 +00:00
output = subprocess.check_output(
cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True
)
2021-02-19 14:38:20 +00:00
if output:
return int(output)
except Exception as e:
print(f"Cannot get server pid with {cmd}, got {output}: {e}")
return None # most likely server is dead
def colored(text, args, color=None, on_color=None, attrs=None):
if termcolor and (sys.stdout.isatty() or args.force_color):
return termcolor.colored(text, color, on_color, attrs)
else:
return text
class TestStatus(enum.Enum):
FAIL = "FAIL"
UNKNOWN = "UNKNOWN"
OK = "OK"
SKIPPED = "SKIPPED"
class FailureReason(enum.Enum):
# FAIL reasons
TIMEOUT = "Timeout!"
SERVER_DIED = "server died"
EXIT_CODE = "return code: "
STDERR = "having stderror: "
EXCEPTION = "having exception in stdout: "
RESULT_DIFF = "result differs with reference: "
2024-04-01 16:07:52 +00:00
TOO_LONG = (
f"Test runs too long (> {TEST_MAX_RUN_TIME_IN_SECONDS}s). Make it faster."
)
INTERNAL_QUERY_FAIL = "Internal query (CREATE/DROP DATABASE) failed:"
# SKIPPED reasons
NOT_SUPPORTED_IN_CLOUD = "not supported in cloud environment"
NOT_SUPPORTED_IN_PRIVATE = "not supported in private build"
DISABLED = "disabled"
SKIP = "skip"
NO_JINJA = "no jinja"
NO_ZOOKEEPER = "no zookeeper"
NO_SHARD = "no shard"
FAST_ONLY = "running fast tests only"
NO_LONG = "not running long tests"
REPLICATED_DB = "replicated-database"
2023-12-04 23:23:11 +00:00
NON_ATOMIC_DB = "database engine not Atomic"
2024-07-02 12:15:59 +00:00
OBJECT_STORAGE = "object-storage"
S3_STORAGE = "s3-storage"
2024-07-02 12:15:59 +00:00
AZURE_BLOB_STORAGE = "azure-blob-storage"
BUILD = "not running for current build"
2023-02-03 13:34:18 +00:00
NO_PARALLEL_REPLICAS = "smth in not supported with parallel replicas"
SHARED_MERGE_TREE = "no-shared-merge-tree"
# UNKNOWN reasons
NO_REFERENCE = "no reference file"
INTERNAL_ERROR = "Test internal error: "
def threshold_generator(always_on_prob, always_off_prob, min_val, max_val):
def gen():
tmp = random.random()
if tmp <= always_on_prob:
return min_val
if tmp <= always_on_prob + always_off_prob:
return max_val
if isinstance(min_val, int) and isinstance(max_val, int):
return random.randint(min_val, max_val)
else:
return random.uniform(min_val, max_val)
return gen
# To keep dependency list as short as possible, tzdata is not used here (to
# avoid try/except block for import)
def get_localzone():
return os.getenv("TZ", "/".join(os.readlink("/etc/localtime").split("/")[-2:]))
class SettingsRandomizer:
settings = {
"max_insert_threads": lambda: (
12 if random.random() < 0.03 else random.randint(1, 3)
),
"group_by_two_level_threshold": threshold_generator(0.2, 0.2, 1, 1000000),
"group_by_two_level_threshold_bytes": threshold_generator(
0.2, 0.2, 1, 50000000
),
"distributed_aggregation_memory_efficient": lambda: random.randint(0, 1),
2022-02-18 12:22:06 +00:00
"fsync_metadata": lambda: random.randint(0, 1),
"output_format_parallel_formatting": lambda: random.randint(0, 1),
"input_format_parallel_parsing": lambda: random.randint(0, 1),
2022-04-27 11:02:45 +00:00
"min_chunk_bytes_for_parallel_parsing": lambda: max(
1024, int(random.gauss(10 * 1024 * 1024, 5 * 1000 * 1000))
),
2022-03-28 13:33:01 +00:00
"max_read_buffer_size": lambda: random.randint(500000, 1048576),
"prefer_localhost_replica": lambda: random.randint(0, 1),
"max_block_size": lambda: random.randint(8000, 100000),
2024-05-13 16:39:45 +00:00
"max_joined_block_size_rows": lambda: random.randint(8000, 100000),
2024-07-12 22:39:53 +00:00
"max_threads": lambda: 32 if random.random() < 0.03 else random.randint(1, 3),
2023-12-05 18:17:47 +00:00
"optimize_append_index": lambda: random.randint(0, 1),
"optimize_if_chain_to_multiif": lambda: random.randint(0, 1),
"optimize_if_transform_strings_to_enum": lambda: random.randint(0, 1),
"optimize_read_in_order": lambda: random.randint(0, 1),
2023-12-05 18:17:47 +00:00
"optimize_or_like_chain": lambda: random.randint(0, 1),
"optimize_substitute_columns": lambda: random.randint(0, 1),
"enable_multiple_prewhere_read_steps": lambda: random.randint(0, 1),
"read_in_order_two_level_merge_threshold": lambda: random.randint(0, 100),
"optimize_aggregation_in_order": lambda: random.randint(0, 1),
"aggregation_in_order_max_block_bytes": lambda: random.randint(0, 50000000),
2022-07-08 19:27:16 +00:00
"use_uncompressed_cache": lambda: random.randint(0, 1),
"min_bytes_to_use_direct_io": threshold_generator(
0.2, 0.5, 1, 10 * 1024 * 1024 * 1024
),
"min_bytes_to_use_mmap_io": threshold_generator(
0.2, 0.5, 1, 10 * 1024 * 1024 * 1024
),
2022-07-08 19:27:16 +00:00
"local_filesystem_read_method": lambda: random.choice(
# Allow to use uring only when running on Linux
["read", "pread", "mmap", "pread_threadpool", "io_uring"]
if platform.system().lower() == "linux"
else ["read", "pread", "mmap", "pread_threadpool"]
2022-07-08 19:27:16 +00:00
),
"remote_filesystem_read_method": lambda: random.choice(["read", "threadpool"]),
"local_filesystem_read_prefetch": lambda: random.randint(0, 1),
2023-11-17 11:17:01 +00:00
"filesystem_cache_segments_batch_size": lambda: random.choice([0, 3, 10, 50]),
2023-10-20 09:48:27 +00:00
"read_from_filesystem_cache_if_exists_otherwise_bypass_cache": lambda: random.randint(
0, 1
),
"throw_on_error_from_cache_on_write_operations": lambda: random.randint(0, 1),
2022-07-08 19:27:16 +00:00
"remote_filesystem_read_prefetch": lambda: random.randint(0, 1),
2023-04-20 17:35:49 +00:00
"allow_prefetched_read_pool_for_remote_filesystem": lambda: random.randint(
0, 1
),
"filesystem_prefetch_max_memory_usage": lambda: random.choice(
2023-04-24 11:09:17 +00:00
["32Mi", "64Mi", "128Mi"]
2023-04-20 17:35:49 +00:00
),
"filesystem_prefetches_limit": lambda: random.choice(
[0, 10]
), # 0 means unlimited (but anyway limited by prefetch_max_memory_usage)
"filesystem_prefetch_min_bytes_for_single_read_task": lambda: random.choice(
["1Mi", "8Mi", "16Mi"]
),
"filesystem_prefetch_step_marks": lambda: random.choice(
[0, 50]
), # 0 means 'auto'
"filesystem_prefetch_step_bytes": lambda: random.choice(
[0, "100Mi"]
), # 0 means 'auto'
# "compile_expressions": lambda: random.randint(0, 1), - this setting has a bug: https://github.com/ClickHouse/ClickHouse/issues/51264
2022-07-08 19:27:16 +00:00
"compile_aggregate_expressions": lambda: random.randint(0, 1),
"compile_sort_description": lambda: random.randint(0, 1),
2022-07-13 12:50:12 +00:00
"merge_tree_coarse_index_granularity": lambda: random.randint(2, 32),
"optimize_distinct_in_order": lambda: random.randint(0, 1),
"max_bytes_before_external_sort": threshold_generator(
0.3, 0.5, 0, 10 * 1024 * 1024 * 1024
),
"max_bytes_before_external_group_by": threshold_generator(
0.3, 0.5, 0, 10 * 1024 * 1024 * 1024
),
2022-07-27 16:21:56 +00:00
"max_bytes_before_remerge_sort": lambda: random.randint(1, 3000000000),
"min_compress_block_size": lambda: random.randint(1, 1048576 * 3),
"max_compress_block_size": lambda: random.randint(1, 1048576 * 3),
"merge_tree_compact_parts_min_granules_to_multibuffer_read": lambda: random.randint(
1, 128
),
"optimize_sorting_by_input_stream_properties": lambda: random.randint(0, 1),
"http_response_buffer_size": lambda: random.randint(0, 10 * 1048576),
"http_wait_end_of_query": lambda: random.random() > 0.5,
"enable_memory_bound_merging_of_aggregation_results": lambda: random.randint(
0, 1
),
"min_count_to_compile_expression": lambda: random.choice([0, 3]),
"min_count_to_compile_aggregate_expression": lambda: random.choice([0, 3]),
"min_count_to_compile_sort_description": lambda: random.choice([0, 3]),
"session_timezone": lambda: random.choice(
[
# special non-deterministic around 1970 timezone, see [1].
#
# [1]: https://github.com/ClickHouse/ClickHouse/issues/42653
"America/Mazatlan",
"America/Hermosillo",
"Mexico/BajaSur",
# These timezones had DST transitions on some unusual dates (e.g. 2000-01-15 12:00:00).
"Africa/Khartoum",
"Africa/Juba",
# server default that is randomized across all timezones
# NOTE: due to lots of trickery we cannot use empty timezone here, but this should be the same.
get_localzone(),
]
),
"prefer_warmed_unmerged_parts_seconds": lambda: random.randint(0, 10),
"use_page_cache_for_disks_without_file_cache": lambda: random.random() < 0.7,
"page_cache_inject_eviction": lambda: random.random() < 0.5,
2024-03-06 09:20:51 +00:00
"merge_tree_read_split_ranges_into_intersecting_and_non_intersecting_injection_probability": lambda: round(
random.random(), 2
),
"prefer_external_sort_block_bytes": lambda: random.choice([0, 1, 100000000]),
"cross_join_min_rows_to_compress": lambda: random.choice([0, 1, 100000000]),
"cross_join_min_bytes_to_compress": lambda: random.choice([0, 1, 100000000]),
"min_external_table_block_size_bytes": lambda: random.choice([0, 1, 100000000]),
"max_parsing_threads": lambda: random.choice([0, 1, 10]),
}
@staticmethod
def get_random_settings(args):
random_settings = {}
is_debug = BuildFlags.DEBUG in args.build_flags
for setting, generator in SettingsRandomizer.settings.items():
if (
is_debug
and setting == "allow_prefetched_read_pool_for_remote_filesystem"
):
random_settings[setting] = 0
else:
random_settings[setting] = generator()
return random_settings
2022-07-07 22:16:01 +00:00
class MergeTreeSettingsRandomizer:
settings = {
"ratio_of_defaults_for_sparse_serialization": threshold_generator(
0.3, 0.5, 0.0, 1.0
),
"prefer_fetch_merged_part_size_threshold": threshold_generator(
0.2, 0.5, 1, 10 * 1024 * 1024 * 1024
),
"vertical_merge_algorithm_min_rows_to_activate": threshold_generator(
2022-12-16 17:50:01 +00:00
0.4, 0.4, 1, 1000000
),
"vertical_merge_algorithm_min_columns_to_activate": threshold_generator(
2022-12-16 17:50:01 +00:00
0.4, 0.4, 1, 100
),
2023-03-31 12:49:53 +00:00
"allow_vertical_merges_from_compact_to_wide_parts": lambda: random.randint(
0, 1
),
"min_merge_bytes_to_use_direct_io": threshold_generator(
0.25, 0.25, 1, 10 * 1024 * 1024 * 1024
),
"index_granularity_bytes": lambda: random.randint(1024, 30 * 1024 * 1024),
"merge_max_block_size": lambda: random.randint(1, 8192 * 3),
"index_granularity": lambda: random.randint(1, 65536),
"min_bytes_for_wide_part": threshold_generator(0.3, 0.3, 0, 1024 * 1024 * 1024),
"compress_marks": lambda: random.randint(0, 1),
"compress_primary_key": lambda: random.randint(0, 1),
"marks_compress_block_size": lambda: random.randint(8000, 100000),
"primary_key_compress_block_size": lambda: random.randint(8000, 100000),
2023-08-15 00:38:07 +00:00
"replace_long_file_name_to_hash": lambda: random.randint(0, 1),
"max_file_name_length": threshold_generator(0.3, 0.3, 0, 128),
"min_bytes_for_full_part_storage": threshold_generator(
0.3, 0.3, 0, 512 * 1024 * 1024
),
"compact_parts_max_bytes_to_buffer": lambda: random.randint(
1024, 512 * 1024 * 1024
),
"compact_parts_max_granules_to_buffer": threshold_generator(0.15, 0.15, 1, 256),
"compact_parts_merge_max_bytes_to_prefetch_part": lambda: random.randint(
1, 32 * 1024 * 1024
),
"cache_populated_by_fetch": lambda: random.randint(0, 1),
2024-04-03 19:17:46 +00:00
"concurrent_part_removal_threshold": threshold_generator(0.2, 0.3, 0, 100),
2024-05-08 14:26:03 +00:00
"old_parts_lifetime": threshold_generator(0.2, 0.3, 10, 8 * 60),
}
2022-07-07 22:16:01 +00:00
@staticmethod
def get_random_settings(args):
random_settings = {}
2022-07-07 22:16:01 +00:00
for setting, generator in MergeTreeSettingsRandomizer.settings.items():
if setting not in args.changed_merge_tree_settings:
random_settings[setting] = generator()
2022-07-07 22:16:01 +00:00
return random_settings
def replace_in_file(filename, what, with_what):
os.system(f"LC_ALL=C sed -i -e 's|{what}|{with_what}|g' {filename}")
class TestResult:
2022-04-27 11:02:45 +00:00
def __init__(
self,
case_name: str,
status: TestStatus,
reason: Optional[FailureReason],
total_time: float,
description: str,
):
self.case_name: str = case_name
self.status: TestStatus = status
self.reason: Optional[FailureReason] = reason
self.total_time: float = total_time
self.description: str = description
self.need_retry: bool = False
def check_if_need_retry(self, args, stdout, stderr, runs_count):
2022-04-28 11:26:49 +00:00
if (
self.status != TestStatus.FAIL
or not need_retry(args, stdout, stderr, self.total_time)
or MAX_RETRIES < runs_count
):
return
self.need_retry = True
class TestCase:
@staticmethod
def get_description_from_exception_info(exc_info):
exc_type, exc_value, tb = exc_info
exc_name = exc_type.__name__
traceback_str = "\n".join(traceback.format_tb(tb, 10))
description = f"\n{exc_name}\n{exc_value}\n{traceback_str}"
return description
@staticmethod
def get_reference_file(suite_dir, name):
"""
Returns reference file name for specified test
"""
name = removesuffix(name, ".gen")
2022-04-27 11:02:45 +00:00
for ext in [".reference", ".gen.reference"]:
reference_file = os.path.join(suite_dir, name) + ext
if os.path.isfile(reference_file):
return reference_file
return None
@staticmethod
def configure_testcase_args(args, case_file, suite_tmp_dir):
testcase_args = copy.deepcopy(args)
testcase_args.testcase_start_time = datetime.now()
testcase_basename = os.path.basename(case_file)
2022-04-27 11:02:45 +00:00
testcase_args.testcase_client = (
f"{testcase_args.client} --log_comment '{testcase_basename}'"
)
testcase_args.testcase_basename = testcase_basename
2021-08-06 14:38:28 +00:00
if testcase_args.database:
database = testcase_args.database
os.environ.setdefault("CLICKHOUSE_DATABASE", database)
os.environ.setdefault("CLICKHOUSE_TMP", suite_tmp_dir)
testcase_args.test_tmp_dir = suite_tmp_dir
else:
2022-04-28 11:26:49 +00:00
# If --database is not specified, we will create temporary database with
# unique name and we will recreate and drop it for each test
2022-08-31 18:53:57 +00:00
def random_str(length=8):
alphabet = string.ascii_lowercase + string.digits
# NOTE: it is important not to use default random generator, since it shares state.
2022-04-27 11:02:45 +00:00
return "".join(
random.SystemRandom().choice(alphabet) for _ in range(length)
)
2021-08-06 14:38:28 +00:00
2022-04-28 11:26:49 +00:00
database = f"test_{random_str()}"
2021-08-06 14:38:28 +00:00
2022-04-27 11:02:45 +00:00
clickhouse_execute(
args,
"CREATE DATABASE IF NOT EXISTS "
+ database
+ get_db_engine(testcase_args, database),
2022-08-12 09:28:16 +00:00
settings=get_create_database_settings(args, testcase_args),
2022-04-27 11:02:45 +00:00
)
2021-08-06 14:38:28 +00:00
os.environ["CLICKHOUSE_DATABASE"] = database
# Set temporary directory to match the randomly generated database,
# because .sh tests also use it for temporary files and we want to avoid
# collisions.
testcase_args.test_tmp_dir = os.path.join(suite_tmp_dir, database)
os.mkdir(testcase_args.test_tmp_dir)
os.environ["CLICKHOUSE_TMP"] = testcase_args.test_tmp_dir
testcase_args.testcase_database = database
# Printed only in case of failures
#
# NOTE: here we use "CLICKHOUSE_TMP" instead of "file_suffix",
# so it is installed in configure_testcase_args() unlike other files
# (stdout_file, stderr_file) in TestCase::__init__().
# Since using CLICKHOUSE_TMP is easier to use in expect.
testcase_args.debug_log_file = (
os.path.join(testcase_args.test_tmp_dir, testcase_basename) + ".debuglog"
)
return testcase_args
@staticmethod
2022-07-14 11:42:12 +00:00
def cli_format_settings(settings_list) -> str:
out = []
for k, v in settings_list.items():
out.extend([f"--{k}", str(v)])
return " ".join(out)
@staticmethod
def http_format_settings(settings_list) -> str:
return urllib.parse.urlencode(settings_list)
2022-04-28 11:26:49 +00:00
def has_show_create_table_in_test(self):
2022-10-28 23:26:06 +00:00
return not subprocess.call(["grep", "-iq", "show create", self.case_file])
2022-02-11 14:15:56 +00:00
def add_random_settings(self, client_options):
new_options = ""
if self.randomize_settings:
http_params = self.http_format_settings(self.random_settings)
if len(self.base_url_params) == 0:
os.environ["CLICKHOUSE_URL_PARAMS"] = http_params
else:
os.environ["CLICKHOUSE_URL_PARAMS"] = (
self.base_url_params + "&" + http_params
)
new_options += f" {self.cli_format_settings(self.random_settings)}"
if self.randomize_merge_tree_settings:
2022-07-07 22:16:01 +00:00
new_options += f" --allow_merge_tree_settings {self.cli_format_settings(self.merge_tree_random_settings)}"
if new_options != "":
new_options += " --allow_repeated_settings"
os.environ["CLICKHOUSE_CLIENT_OPT"] = (
self.base_client_options + new_options + " "
2022-04-27 11:02:45 +00:00
)
return client_options + new_options
def remove_random_settings_from_env(self):
2022-04-27 11:02:45 +00:00
os.environ["CLICKHOUSE_URL_PARAMS"] = self.base_url_params
os.environ["CLICKHOUSE_CLIENT_OPT"] = self.base_client_options
def add_info_about_settings(self, description):
if self.randomize_settings:
description += f"\nSettings used in the test: {self.cli_format_settings(self.random_settings)}"
if self.randomize_merge_tree_settings:
description += f"\n\nMergeTree settings used in test: {self.cli_format_settings(self.merge_tree_random_settings)}"
2022-02-11 14:15:56 +00:00
2022-07-07 22:16:01 +00:00
return description + "\n"
def __init__(self, suite, case: str, args, is_concurrent: bool):
2022-04-27 11:02:45 +00:00
self.case: str = case # case file name
self.tags: Set[str] = suite.all_tags[case] if case in suite.all_tags else set()
2022-03-10 12:22:24 +00:00
for tag in os.getenv("GLOBAL_TAGS", "").split(","):
self.tags.add(tag.strip())
self.case_file: str = os.path.join(suite.suite_path, case)
(self.name, self.ext) = os.path.splitext(case)
2022-04-28 11:26:49 +00:00
file_suffix = f".{os.getpid()}" if is_concurrent and args.test_runs > 1 else ""
self.reference_file = self.get_reference_file(suite.suite_path, self.name)
2022-04-27 11:02:45 +00:00
self.stdout_file = (
os.path.join(suite.suite_tmp_path, self.name) + file_suffix + ".stdout"
)
self.stderr_file = (
os.path.join(suite.suite_tmp_path, self.name) + file_suffix + ".stderr"
)
self.testcase_args = None
self.runs_count = 0
has_no_random_settings_tag = self.tags and "no-random-settings" in self.tags
self.randomize_settings = not (
args.no_random_settings or has_no_random_settings_tag
2022-07-07 22:16:01 +00:00
)
2022-10-28 23:26:06 +00:00
has_no_random_merge_tree_settings_tag = (
self.tags and "no-random-merge-tree-settings" in self.tags
)
# If test contains SHOW CREATE TABLE do not
# randomize merge tree settings, because
2022-10-28 23:26:06 +00:00
# they will be added to table definition and test will fail
self.randomize_merge_tree_settings = not (
args.no_random_merge_tree_settings
or has_no_random_settings_tag
2022-10-28 23:26:06 +00:00
or has_no_random_merge_tree_settings_tag
or self.has_show_create_table_in_test()
)
if self.randomize_settings:
self.random_settings = SettingsRandomizer.get_random_settings(args)
if self.randomize_merge_tree_settings:
self.merge_tree_random_settings = (
MergeTreeSettingsRandomizer.get_random_settings(args)
)
2022-04-27 11:02:45 +00:00
self.base_url_params = (
os.environ["CLICKHOUSE_URL_PARAMS"]
if "CLICKHOUSE_URL_PARAMS" in os.environ
else ""
)
2022-07-07 22:16:01 +00:00
2022-04-27 11:02:45 +00:00
self.base_client_options = (
os.environ["CLICKHOUSE_CLIENT_OPT"]
if "CLICKHOUSE_CLIENT_OPT" in os.environ
else ""
)
self.show_whitespaces_in_diff = args.show_whitespaces_in_diff
# should skip test, should increment skipped_total, skip reason
def should_skip_test(self, suite) -> Optional[FailureReason]:
tags = self.tags
2022-04-27 11:02:45 +00:00
if tags and ("disabled" in tags) and not args.disabled:
return FailureReason.DISABLED
elif args.private and self.name in suite.private_skip_list:
return FailureReason.NOT_SUPPORTED_IN_PRIVATE
elif args.cloud and ("no-replicated-database" in tags):
return FailureReason.REPLICATED_DB
elif args.cloud and self.name in suite.cloud_skip_list:
return FailureReason.NOT_SUPPORTED_IN_CLOUD
2022-04-27 11:02:45 +00:00
elif (
os.path.exists(os.path.join(suite.suite_path, self.name) + ".disabled")
and not args.disabled
):
return FailureReason.DISABLED
2023-02-03 13:34:18 +00:00
elif "no-parallel-replicas" in tags and args.no_parallel_replicas:
return FailureReason.NO_PARALLEL_REPLICAS
elif args.skip and any(s in self.name for s in args.skip):
return FailureReason.SKIP
elif not USE_JINJA and self.ext.endswith("j2"):
return FailureReason.NO_JINJA
2022-04-27 11:02:45 +00:00
elif (
tags
and (("zookeeper" in tags) or ("replica" in tags))
and not args.zookeeper
):
return FailureReason.NO_ZOOKEEPER
2022-04-27 11:02:45 +00:00
elif (
tags
and (("shard" in tags) or ("distributed" in tags) or ("global" in tags))
and not args.shard
):
return FailureReason.NO_SHARD
2022-04-27 11:02:45 +00:00
elif tags and ("no-fasttest" in tags) and args.fast_tests_only:
return FailureReason.FAST_ONLY
2022-04-27 11:02:45 +00:00
elif (
tags
and (("long" in tags) or ("deadlock" in tags) or ("race" in tags))
and args.no_long
):
# Tests for races and deadlocks usually are run in a loop for a significant amount of time
return FailureReason.NO_LONG
2022-04-27 11:02:45 +00:00
elif tags and ("no-replicated-database" in tags) and args.replicated_database:
return FailureReason.REPLICATED_DB
2023-12-05 07:51:48 +00:00
elif (
tags
and ("atomic-database" in tags)
and (args.replicated_database or args.db_engine not in (None, "Atomic"))
):
2023-12-04 23:23:11 +00:00
return FailureReason.NON_ATOMIC_DB
elif (
tags
and ("no-shared-merge-tree" in tags)
and args.replace_replicated_with_shared
):
return FailureReason.SHARED_MERGE_TREE
2022-04-27 11:02:45 +00:00
elif tags and ("no-s3-storage" in tags) and args.s3_storage:
return FailureReason.S3_STORAGE
2024-07-02 12:15:59 +00:00
elif tags and ("no-azure-blob-storage" in tags) and args.azure_blob_storage:
return FailureReason.AZURE_BLOB_STORAGE
2023-07-31 12:28:13 +00:00
elif (
tags
2024-07-02 12:21:39 +00:00
and ("no-object-storage" in tags)
and (args.azure_blob_storage or args.s3_storage)
):
2024-07-02 12:15:59 +00:00
return FailureReason.OBJECT_STORAGE
2023-07-31 12:28:13 +00:00
elif (
tags
2024-07-02 12:15:59 +00:00
and "no-object-storage-with-slow-build" in tags
and (args.s3_storage or args.azure_blob_storage)
and BuildFlags.RELEASE not in args.build_flags
2023-07-31 12:28:13 +00:00
):
2024-07-02 12:15:59 +00:00
return FailureReason.OBJECT_STORAGE
elif "no-batch" in tags and (
args.run_by_hash_num is not None or args.run_by_hash_total is not None
):
return FailureReason.SKIP
elif tags:
for build_flag in args.build_flags:
2022-04-27 11:02:45 +00:00
if "no-" + build_flag in tags:
return FailureReason.BUILD
for tag in tags:
2022-04-27 11:02:45 +00:00
tag = tag.replace("-", "_")
if tag.startswith("use_") and tag not in args.build_flags:
return FailureReason.BUILD
return None
2022-07-08 19:27:16 +00:00
def process_result_impl(
self, proc, stdout: str, stderr: str, debug_log: str, total_time: float
):
description = ""
if debug_log:
debug_log = "\n".join(debug_log.splitlines()[:100])
if proc:
if proc.returncode is None:
try:
pgid = os.getpgid(proc.pid)
# NOTE: this still may leave some processes, that had been
# created by timeout(1), since it also creates new process
# group. But this should not be a problem with default
# options, since the default time for each test is 10min,
# and this is way more bigger then the timeout for each
# timeout(1) invocation.
#
# But as a workaround we are sending SIGTERM first, and
# only after SIGKILL, that way timeout(1) will have an
# ability to terminate childrens (though not always since
# signals are asynchronous).
os.killpg(pgid, signal.SIGTERM)
# This may not be enough, but this is at least something
# (and anyway it is OK to spend 0.1 second more in case of
# test timeout).
sleep(0.1)
os.killpg(pgid, signal.SIGKILL)
except OSError as e:
if e.errno != ESRCH:
raise
if stderr:
description += stderr
if debug_log:
description += "\n"
description += debug_log
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.TIMEOUT,
total_time,
description,
)
if proc.returncode != 0:
reason = FailureReason.EXIT_CODE
description += str(proc.returncode)
if stderr:
description += "\n"
description += stderr
if debug_log:
description += "\n"
description += debug_log
# Stop on fatal errors like segmentation fault. They are sent to client via logs.
2022-04-27 11:02:45 +00:00
if " <Fatal> " in stderr:
reason = FailureReason.SERVER_DIED
2022-04-27 11:02:45 +00:00
if (
self.testcase_args.stop
and (
"Connection refused" in stderr
or "Attempt to read after eof" in stderr
)
and "Received exception from server" not in stderr
):
reason = FailureReason.SERVER_DIED
if os.path.isfile(self.stdout_file):
description += ", result:\n\n"
with open(self.stdout_file, "rb") as f:
description += trim_for_log(
f.read().decode("utf-8", errors="ignore")
)
2022-04-27 11:02:45 +00:00
description += "\n"
2022-04-28 11:26:49 +00:00
description += f"\nstdout:\n{stdout}\n"
2022-04-27 11:02:45 +00:00
return TestResult(
self.name, TestStatus.FAIL, reason, total_time, description
)
if stderr:
description += "\n"
description += trim_for_log(stderr)
description += "\n"
description += "\nstdout:\n"
description += trim_for_log(stdout)
description += "\n"
if debug_log:
description += "\n"
description += debug_log
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.STDERR,
total_time,
description,
)
if "Exception" in stdout:
description += "\n{}\n".format("\n".join(stdout.splitlines()[:100]))
if debug_log:
description += "\n"
description += debug_log
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.EXCEPTION,
total_time,
description,
)
if "@@SKIP@@" in stdout:
skip_reason = stdout.replace("@@SKIP@@", "").rstrip("\n")
description += " - "
description += skip_reason
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.SKIPPED,
FailureReason.SKIP,
total_time,
description,
)
if self.reference_file is None:
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.UNKNOWN,
FailureReason.NO_REFERENCE,
total_time,
description,
)
result_is_different = subprocess.call(
["diff", "-q", self.reference_file, self.stdout_file], stdout=PIPE
)
if result_is_different:
2024-02-26 17:46:15 +00:00
with Popen(
2022-04-27 11:02:45 +00:00
[
"diff",
"-U",
str(self.testcase_args.unified),
self.reference_file,
self.stdout_file,
],
stdout=PIPE,
universal_newlines=True,
2024-02-26 17:46:15 +00:00
) as diff_proc:
if self.show_whitespaces_in_diff:
with Popen(
["sed", "-e", "s/[ \t]\\+$/&$/g"],
stdin=diff_proc.stdout,
stdout=PIPE,
) as sed_proc:
diff = sed_proc.communicate()[0].decode(
"utf-8", errors="ignore"
)
else:
diff = diff_proc.communicate()[0]
if diff.startswith("Binary files "):
diff += "Content of stdout:\n===================\n"
2024-02-26 17:46:15 +00:00
with open(self.stdout_file, "rb") as file:
diff += str(file.read())
diff += "==================="
2022-04-28 11:26:49 +00:00
description += f"\n{diff}\n"
if debug_log:
description += "\n"
description += debug_log
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.RESULT_DIFF,
total_time,
description,
)
if (
self.testcase_args.test_runs > 1
2024-04-01 13:21:23 +00:00
and total_time > TEST_MAX_RUN_TIME_IN_SECONDS
2022-04-27 11:02:45 +00:00
and "long" not in self.tags
):
if debug_log:
description += "\n"
description += debug_log
# We're in Flaky Check mode, check the run time as well while we're at it.
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.TOO_LONG,
total_time,
description,
)
if os.path.exists(self.stdout_file):
os.remove(self.stdout_file)
if os.path.exists(self.stderr_file):
os.remove(self.stderr_file)
if os.path.exists(self.testcase_args.debug_log_file):
os.remove(self.testcase_args.debug_log_file)
return TestResult(self.name, TestStatus.OK, None, total_time, description)
@staticmethod
def print_test_time(test_time) -> str:
if args.print_time:
2022-04-28 11:26:49 +00:00
return f" {test_time:.2f} sec."
else:
2022-04-27 11:02:45 +00:00
return ""
def process_result(self, result: TestResult, messages):
description_full = messages[result.status]
description_full += self.print_test_time(result.total_time)
if result.reason is not None:
description_full += f"\nReason: {result.reason.value} "
description_full += result.description
2023-10-29 16:21:45 +00:00
2023-11-04 21:04:39 +00:00
if (
args.collect_per_test_coverage
and BuildFlags.SANITIZE_COVERAGE in args.build_flags
):
2024-04-30 12:32:32 +00:00
try:
clickhouse_execute(
args,
f"INSERT INTO system.coverage_log SELECT now(), '{self.case}', coverageCurrent()",
retry_error_codes=True,
)
except Exception as e:
print("Cannot insert coverage data: ", str(e))
2024-01-15 04:40:03 +00:00
# Check for dumped coverage files
file_pattern = "coverage.*"
matching_files = glob.glob(file_pattern)
for file_path in matching_files:
2024-01-16 09:41:13 +00:00
try:
body = read_file_as_binary_string(file_path)
clickhouse_execute(
args,
2024-04-30 20:24:46 +00:00
"INSERT INTO system.coverage_log "
"SETTINGS async_insert=1, wait_for_async_insert=0, async_insert_busy_timeout_min_ms=200, async_insert_busy_timeout_max_ms=1000 "
f"SELECT now(), '{self.case}', groupArray(data) FROM input('data UInt64') FORMAT RowBinary",
2024-01-16 09:41:13 +00:00
body=body,
retry_error_codes=True,
)
except Exception as e:
print("Cannot insert coverage data: ", str(e))
# Remove the file even in case of exception to avoid accumulation and quadratic complexity.
2024-01-15 04:40:03 +00:00
os.remove(file_path)
2024-04-29 19:38:45 +00:00
_ = clickhouse_execute(args, "SYSTEM FLUSH ASYNC INSERT QUEUE")
2023-10-29 16:21:45 +00:00
coverage = clickhouse_execute(
args,
2024-01-14 23:36:07 +00:00
"SELECT length(coverageCurrent())",
2023-10-29 16:21:45 +00:00
retry_error_codes=True,
).decode()
2024-01-14 23:25:07 +00:00
description_full += f" (coverage: {coverage})"
2023-10-29 16:21:45 +00:00
description_full += "\n"
if result.status == TestStatus.FAIL and self.testcase_args:
2022-04-27 11:02:45 +00:00
description_full += "Database: " + self.testcase_args.testcase_database
result.description = description_full
return result
@staticmethod
2022-04-28 11:26:49 +00:00
def send_test_name_failed(suite: str, case: str):
pid = os.getpid()
clickhouse_execute(
args,
f"SELECT 'Running test {suite}/{case} from pid={pid}'",
retry_error_codes=True,
)
2022-04-28 11:26:49 +00:00
def run_single_test(
self, server_logs_level, client_options
) -> Tuple[Optional[Popen], str, str, str, float]:
args = self.testcase_args
client = args.testcase_client
start_time = args.testcase_start_time
database = args.testcase_database
2024-07-09 21:59:53 +00:00
if args.client_log:
2024-07-12 17:18:16 +00:00
log_opt = " --client_logs_file=" + args.client_log + " "
2024-07-09 21:59:53 +00:00
client_options += log_opt
os.environ["CLICKHOUSE_CLIENT_OPT"] = (
os.environ["CLICKHOUSE_CLIENT_OPT"]
if "CLICKHOUSE_CLIENT_OPT" in os.environ
else ""
) + log_opt
# This is for .sh tests
os.environ["CLICKHOUSE_LOG_COMMENT"] = args.testcase_basename
2023-08-15 18:37:39 +00:00
query_params = ""
if "need-query-parameters" in self.tags:
query_params = (
" --param_CLICKHOUSE_DATABASE="
+ database
+ " --param_CLICKHOUSE_DATABASE_1="
+ database
+ "_1"
)
params = {
2023-08-15 18:37:39 +00:00
"client": client + " --database=" + database + query_params,
2022-04-27 11:02:45 +00:00
"logs_level": server_logs_level,
"options": client_options,
"test": self.case_file,
"stdout": self.stdout_file,
"stderr": self.stderr_file,
"secure": "--secure" if args.secure else "",
}
# >> append to stderr (but not stdout since it is not used there),
# because there are also output of per test database creation
2023-03-21 09:40:10 +00:00
pattern = "{test} > {stdout} 2> {stderr}"
if self.ext == ".sql" and args.cloud:
# Get at least some logs, because we don't have access to system.text_log and pods...
pattern = (
"{client} --send_logs_level={logs_level} {secure} --multiquery {options}"
" --send_logs_level=trace < {test} > {stdout} 2>> /test_output/some_logs_from_server.log"
)
elif self.ext == ".sql" and not args.cloud:
2022-04-27 11:02:45 +00:00
pattern = (
"{client} --send_logs_level={logs_level} {secure} --multiquery {options} < "
2022-04-27 11:02:45 +00:00
+ pattern
)
# We want to calculate per-test code coverage. That's why we reset it before each test.
2023-11-04 21:04:39 +00:00
if (
args.collect_per_test_coverage
and args.reset_coverage_before_every_test
2023-11-04 21:04:39 +00:00
and BuildFlags.SANITIZE_COVERAGE in args.build_flags
):
clickhouse_execute(
args,
"SYSTEM RESET COVERAGE",
retry_error_codes=True,
)
command = pattern.format(**params)
2024-02-26 17:46:15 +00:00
# pylint:disable-next=consider-using-with; TODO: fix
proc = Popen(command, shell=True, env=os.environ, start_new_session=True)
try:
proc.wait(args.timeout)
except subprocess.TimeoutExpired:
# Whether the test timed out will be decided later
pass
debug_log = ""
if os.path.exists(self.testcase_args.debug_log_file):
with open(self.testcase_args.debug_log_file, "rb") as stream:
debug_log += self.testcase_args.debug_log_file + ":\n"
debug_log += str(stream.read(), errors="replace", encoding="utf-8")
debug_log += "\n"
total_time = (datetime.now() - start_time).total_seconds()
# Normalize randomized database names in stdout, stderr files.
replace_in_file(self.stdout_file, database, "default")
if args.hide_db_name:
replace_in_file(self.stderr_file, database, "default")
if args.replicated_database:
replace_in_file(self.stdout_file, "/auto_{shard}", "")
replace_in_file(self.stdout_file, "auto_{replica}", "")
# Normalize hostname in stdout file.
replace_in_file(self.stdout_file, socket.gethostname(), "localhost")
2022-04-27 11:02:45 +00:00
if os.environ.get("CLICKHOUSE_PORT_TCP"):
replace_in_file(
self.stdout_file,
f"PORT {os.environ['CLICKHOUSE_PORT_TCP']}",
"PORT 9000",
)
replace_in_file(
self.stdout_file,
f"localhost {os.environ['CLICKHOUSE_PORT_TCP']}",
"localhost 9000",
)
if os.environ.get("CLICKHOUSE_PORT_TCP_SECURE"):
replace_in_file(
self.stdout_file,
f"PORT {os.environ['CLICKHOUSE_PORT_TCP_SECURE']}",
"PORT 9440",
)
replace_in_file(
self.stdout_file,
f"localhost {os.environ['CLICKHOUSE_PORT_TCP_SECURE']}",
"localhost 9440",
)
if os.environ.get("CLICKHOUSE_PATH"):
replace_in_file(
self.stdout_file,
os.environ["CLICKHOUSE_PATH"],
"/var/lib/clickhouse",
)
if os.environ.get("CLICKHOUSE_PORT_HTTPS"):
replace_in_file(
self.stdout_file,
f"https://localhost:{os.environ['CLICKHOUSE_PORT_HTTPS']}/",
"https://localhost:8443/",
)
2022-04-28 11:26:49 +00:00
stdout = ""
if os.path.exists(self.stdout_file):
with open(self.stdout_file, "rb") as stdfd:
stdout = str(stdfd.read(), errors="replace", encoding="utf-8")
2022-04-28 11:26:49 +00:00
stderr = ""
if os.path.exists(self.stderr_file):
with open(self.stderr_file, "rb") as stdfd:
stderr += str(stdfd.read(), errors="replace", encoding="utf-8")
return proc, stdout, stderr, debug_log, total_time
def run(self, args, suite, client_options, server_logs_level):
start_time = datetime.now()
try:
skip_reason = self.should_skip_test(suite)
if skip_reason is not None:
2022-04-27 11:02:45 +00:00
return TestResult(self.name, TestStatus.SKIPPED, skip_reason, 0.0, "")
if args.testname:
try:
self.send_test_name_failed(suite.suite, self.case)
2022-04-28 11:26:49 +00:00
except Exception:
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.SERVER_DIED,
0.0,
"\nServer does not respond to health check\n",
)
self.runs_count += 1
2022-04-27 11:02:45 +00:00
self.testcase_args = self.configure_testcase_args(
args, self.case_file, suite.suite_tmp_path
)
client_options = self.add_random_settings(client_options)
if not is_valid_utf_8(self.case_file) or (
self.reference_file and not is_valid_utf_8(self.reference_file)
):
proc, stdout, stderr, debug_log, total_time = self.run_single_test(
server_logs_level, client_options
)
result = self.process_result_impl(
proc, stdout, stderr, debug_log, total_time
)
result.check_if_need_retry(args, stdout, stderr, self.runs_count)
# to avoid breaking CSV parser
result.description = result.description.replace("\0", "")
else:
with SharedEngineReplacer(
self.case_file,
args.replace_replicated_with_shared,
args.replace_non_replicated_with_shared,
False,
):
with SharedEngineReplacer(
self.reference_file,
args.replace_replicated_with_shared,
args.replace_non_replicated_with_shared,
True,
):
(
proc,
stdout,
stderr,
debug_log,
total_time,
) = self.run_single_test(server_logs_level, client_options)
result = self.process_result_impl(
proc, stdout, stderr, debug_log, total_time
)
result.check_if_need_retry(
args, stdout, stderr, self.runs_count
)
# to avoid breaking CSV parser
result.description = result.description.replace("\0", "")
if result.status == TestStatus.FAIL:
result.description = self.add_info_about_settings(result.description)
self._cleanup(result.status == TestStatus.OK)
return result
except KeyboardInterrupt as e:
raise e
except HTTPError:
total_time = (datetime.now() - start_time).total_seconds()
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.INTERNAL_QUERY_FAIL,
total_time,
self.add_info_about_settings(
self.get_description_from_exception_info(sys.exc_info())
),
)
except socket.timeout:
total_time = (datetime.now() - start_time).total_seconds()
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.TIMEOUT,
total_time,
2022-04-27 11:02:45 +00:00
self.add_info_about_settings(
self.get_description_from_exception_info(sys.exc_info())
2022-04-27 11:02:45 +00:00
),
)
2022-12-29 23:42:03 +00:00
except (ConnectionError, http.client.ImproperConnectionState):
total_time = (datetime.now() - start_time).total_seconds()
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.SERVER_DIED,
total_time,
2022-04-27 11:02:45 +00:00
self.add_info_about_settings(
self.get_description_from_exception_info(sys.exc_info())
2022-04-27 11:02:45 +00:00
),
)
2022-04-28 11:26:49 +00:00
except Exception:
total_time = (datetime.now() - start_time).total_seconds()
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.UNKNOWN,
FailureReason.INTERNAL_ERROR,
total_time,
2022-04-27 11:02:45 +00:00
self.get_description_from_exception_info(sys.exc_info()),
)
finally:
self.remove_random_settings_from_env()
def _cleanup(self, passed):
args = self.testcase_args
need_cleanup = not args.database
if need_cleanup and args.no_drop_if_fail:
need_cleanup = passed
if not need_cleanup:
return
time_passed = (datetime.now() - args.testcase_start_time).total_seconds()
timeout = max(args.timeout - time_passed, 20)
self._cleanup_database(args, timeout)
shutil.rmtree(args.test_tmp_dir)
def _cleanup_database(self, args, timeout):
database = args.testcase_database
# Check if the test does not cleanup its tables.
# Only for newly added tests. Please extend this check to the old tests as well.
if self.case_file >= "02800":
leftover_tables = (
clickhouse_execute(
args,
f"SHOW TABLES FROM {database}",
timeout=timeout,
settings={
"log_comment": args.testcase_basename,
},
)
.decode()
.replace("\n", ", ")
)
if len(leftover_tables) != 0:
2024-02-26 17:46:15 +00:00
raise TestException(
f"The test should cleanup its tables ({leftover_tables}), otherwise it is inconvenient for running it locally."
)
drop_database_query = f"DROP DATABASE IF EXISTS {database}"
if args.replicated_database:
drop_database_query += " ON CLUSTER test_cluster_database_replicated"
# It's possible to get an error "New table appeared in database being dropped or detached. Try again."
for _ in range(1, 60):
try:
clickhouse_execute(
args,
drop_database_query,
timeout=timeout,
settings={
"log_comment": args.testcase_basename,
},
)
except HTTPError as e:
if need_retry(args, e.message, e.message, 0):
continue
raise
break
class TestSuite:
@staticmethod
2022-04-28 11:26:49 +00:00
def tests_in_suite_key_func(item: str) -> float:
2022-04-27 11:02:45 +00:00
if args.order == "random":
return random.random()
2022-04-27 11:02:45 +00:00
reverse = 1 if args.order == "asc" else -1
2022-04-27 11:02:45 +00:00
if -1 == item.find("_"):
return 99998
2022-04-27 11:02:45 +00:00
prefix, _ = item.split("_", 1)
try:
return reverse * int(prefix)
except ValueError:
return 99997
@staticmethod
def render_test_template(j2env, suite_dir, test_name):
"""
Render template for test and reference file if needed
"""
if j2env is None:
return test_name
test_base_name = removesuffix(test_name, ".sql.j2", ".sql")
reference_file_name = test_base_name + ".reference.j2"
reference_file_path = os.path.join(suite_dir, reference_file_name)
if os.path.isfile(reference_file_path):
tpl = j2env.get_template(reference_file_name)
2022-04-27 11:02:45 +00:00
tpl.stream().dump(
os.path.join(suite_dir, test_base_name) + ".gen.reference"
)
if test_name.endswith(".sql.j2"):
tpl = j2env.get_template(test_name)
generated_test_name = test_base_name + ".gen.sql"
tpl.stream().dump(os.path.join(suite_dir, generated_test_name))
return generated_test_name
return test_name
@staticmethod
def read_test_tags(suite_dir: str, all_tests: List[str]) -> Dict[str, Set[str]]:
def get_comment_sign(filename):
2022-04-27 11:02:45 +00:00
if filename.endswith(".sql") or filename.endswith(".sql.j2"):
return "--"
elif (
filename.endswith(".sh")
or filename.endswith(".py")
or filename.endswith(".expect")
):
return "#"
else:
2024-02-26 17:46:15 +00:00
raise TestException(f"Unknown file_extension: {filename}")
def parse_tags_from_line(line, comment_sign) -> Set[str]:
if not line.startswith(comment_sign):
2023-08-15 18:37:39 +00:00
return set()
2022-04-28 11:26:49 +00:00
tags_str = line[len(comment_sign) :].lstrip() # noqa: ignore E203
tags_prefix = "Tags:"
if not tags_str.startswith(tags_prefix):
2023-08-15 18:37:39 +00:00
return set()
2022-04-28 11:26:49 +00:00
tags_str = tags_str[len(tags_prefix) :] # noqa: ignore E203
2022-04-27 11:02:45 +00:00
tags = tags_str.split(",")
tags = {tag.strip() for tag in tags}
return tags
2022-04-28 11:26:49 +00:00
def is_shebang(line: str) -> bool:
2022-04-27 11:02:45 +00:00
return line.startswith("#!")
def find_tag_line(file):
for line in file:
line = line.strip()
if line and not is_shebang(line):
return line
return ""
def load_tags_from_file(filepath):
comment_sign = get_comment_sign(filepath)
2023-08-15 18:37:39 +00:00
need_query_params = False
2022-04-28 11:26:49 +00:00
with open(filepath, "r", encoding="utf-8") as file:
try:
2023-08-15 18:37:39 +00:00
tag_line = find_tag_line(file)
except UnicodeDecodeError:
return []
2023-08-15 18:37:39 +00:00
try:
if filepath.endswith(".sql"):
for line in file:
if "{CLICKHOUSE_DATABASE" in line:
need_query_params = True
except UnicodeDecodeError:
pass
parsed_tags = parse_tags_from_line(tag_line, comment_sign)
if need_query_params:
parsed_tags.add("need-query-parameters")
return parsed_tags
all_tags = {}
start_time = datetime.now()
for test_name in all_tests:
tags = load_tags_from_file(os.path.join(suite_dir, test_name))
if tags:
all_tags[test_name] = tags
elapsed = (datetime.now() - start_time).total_seconds()
if elapsed > 1:
print(f"Tags for suite {suite_dir} read in {elapsed:.2f} seconds")
return all_tags
def __init__(self, args, suite_path: str, suite_tmp_path: str, suite: str):
self.args = args
self.suite_path: str = suite_path
self.suite_tmp_path: str = suite_tmp_path
self.suite: str = suite
self.cloud_skip_list: List[str] = []
self.private_skip_list: List[str] = []
if args.run_by_hash_num is not None and args.run_by_hash_total is not None:
if args.run_by_hash_num > args.run_by_hash_total:
2024-02-26 17:46:15 +00:00
raise TestException(
2022-04-27 11:02:45 +00:00
f"Incorrect run by hash, value {args.run_by_hash_num} bigger than total {args.run_by_hash_total}"
)
2024-02-26 17:46:15 +00:00
def filter_func(x: str) -> bool:
return bool(
stringhash(x) % args.run_by_hash_total == args.run_by_hash_num
)
else:
def filter_func(x: str) -> bool:
_ = x
return True
2022-04-27 11:02:45 +00:00
self.all_tests: List[str] = self.get_tests_list(
self.tests_in_suite_key_func, filter_func
)
self.all_tags: Dict[str, Set[str]] = self.read_test_tags(
self.suite_path, self.all_tests
)
self.sequential_tests = []
self.parallel_tests = []
for test_name in self.all_tests:
if self.is_sequential_test(test_name):
self.sequential_tests.append(test_name)
else:
self.parallel_tests.append(test_name)
def is_sequential_test(self, test_name):
if args.sequential:
if any(s in test_name for s in args.sequential):
return True
if test_name not in self.all_tags:
return False
2022-04-27 11:02:45 +00:00
return ("no-parallel" in self.all_tags[test_name]) or (
"sequential" in self.all_tags[test_name]
)
def get_tests_list(self, sort_key, filter_func):
"""
Return list of tests file names to run
"""
all_tests = list(self.get_selected_tests(filter_func))
all_tests = all_tests * self.args.test_runs
all_tests.sort(key=sort_key)
return all_tests
def get_selected_tests(self, filter_func):
"""
Find all files with tests, filter, render templates
"""
2022-04-27 11:02:45 +00:00
j2env = (
jinja2.Environment(
loader=jinja2.FileSystemLoader(self.suite_path),
keep_trailing_newline=True,
)
if USE_JINJA
else None
)
if j2env is not None:
j2env.globals.update(product=itertools.product)
for test_name in os.listdir(self.suite_path):
if not is_test_from_dir(self.suite_path, test_name):
continue
2022-04-27 11:02:45 +00:00
if self.args.test and not any(
re.search(pattern, test_name) for pattern in self.args.test
):
continue
if USE_JINJA and test_name.endswith(".gen.sql"):
continue
if not filter_func(test_name):
continue
test_name = self.render_test_template(j2env, self.suite_path, test_name)
yield test_name
@staticmethod
def read_test_suite(args, suite_dir_name: str):
def is_data_present():
try:
return int(clickhouse_execute(args, "EXISTS TABLE test.hits"))
except Exception as e:
print(
"Cannot check if dataset is available, assuming it's not: ", str(e)
)
return False
base_dir = os.path.abspath(args.queries)
tmp_dir = os.path.abspath(args.tmp)
suite_path = os.path.join(base_dir, suite_dir_name)
2022-04-27 11:02:45 +00:00
suite_re_obj = re.search("^[0-9]+_(.*)$", suite_dir_name)
if not suite_re_obj: # skip .gitignore and so on
return None
suite_tmp_path = os.path.join(tmp_dir, suite_dir_name)
if not os.path.exists(suite_tmp_path):
os.makedirs(suite_tmp_path)
suite = suite_re_obj.group(1)
if not os.path.isdir(suite_path):
return None
2022-04-27 11:02:45 +00:00
if "stateful" in suite and not args.no_stateful and not is_data_present():
print("Won't run stateful tests because test data wasn't loaded.")
return None
2022-04-27 11:02:45 +00:00
if "stateless" in suite and args.no_stateless:
print("Won't run stateless tests because they were manually disabled.")
return None
2022-04-27 11:02:45 +00:00
if "stateful" in suite and args.no_stateful:
print("Won't run stateful tests because they were manually disabled.")
return None
2021-08-06 14:38:28 +00:00
return TestSuite(args, suite_path, suite_tmp_path, suite)
2021-08-06 14:38:28 +00:00
stop_time = None
exit_code = None
server_died = None
multiprocessing_manager = None
restarted_tests = None
2022-04-27 11:17:54 +00:00
class ServerDied(Exception):
pass
class GlobalTimeout(Exception):
pass
def run_tests_array(all_tests_with_params: Tuple[List[str], int, TestSuite, bool]):
(
all_tests,
num_tests,
test_suite,
is_concurrent,
) = all_tests_with_params
2020-08-26 17:44:03 +00:00
global stop_time
global exit_code
global server_died
global restarted_tests
2022-04-27 11:02:45 +00:00
OP_SQUARE_BRACKET = colored("[", args, attrs=["bold"])
CL_SQUARE_BRACKET = colored("]", args, attrs=["bold"])
MSG_FAIL = (
OP_SQUARE_BRACKET
+ colored(" FAIL ", args, "red", attrs=["bold"])
+ CL_SQUARE_BRACKET
)
MSG_UNKNOWN = (
OP_SQUARE_BRACKET
+ colored(" UNKNOWN ", args, "yellow", attrs=["bold"])
+ CL_SQUARE_BRACKET
)
MSG_OK = (
OP_SQUARE_BRACKET
+ colored(" OK ", args, "green", attrs=["bold"])
+ CL_SQUARE_BRACKET
)
MSG_SKIPPED = (
OP_SQUARE_BRACKET
+ colored(" SKIPPED ", args, "cyan", attrs=["bold"])
+ CL_SQUARE_BRACKET
)
MESSAGES = {
TestStatus.FAIL: MSG_FAIL,
TestStatus.UNKNOWN: MSG_UNKNOWN,
TestStatus.OK: MSG_OK,
TestStatus.SKIPPED: MSG_SKIPPED,
}
passed_total = 0
skipped_total = 0
failures_total = 0
failures_chain = 0
2021-03-29 18:14:06 +00:00
start_time = datetime.now()
client_options = get_additional_client_options(args)
2021-05-20 16:02:46 +00:00
if num_tests > 0:
2022-04-27 11:02:45 +00:00
about = "about " if is_concurrent else ""
proc_name = multiprocessing.current_process().name
print(f"Running {about}{num_tests} {test_suite.suite} tests ({proc_name}).")
2021-05-20 16:02:46 +00:00
while True:
if all_tests:
try:
case = all_tests.pop(0)
except IndexError:
break
2021-05-20 16:02:46 +00:00
else:
break
2019-04-09 13:17:36 +00:00
if server_died.is_set():
2024-07-03 15:53:05 +00:00
stop_tests()
raise ServerDied("Server died")
2019-04-09 13:17:36 +00:00
2020-08-26 17:44:03 +00:00
if stop_time and time() > stop_time:
print("\nStop tests run because global time limit is exceeded.\n")
2024-07-03 15:53:05 +00:00
stop_tests()
raise GlobalTimeout("Stop tests run because global time limit is exceeded")
2020-08-26 17:44:03 +00:00
test_case = TestCase(test_suite, case, args, is_concurrent)
try:
2022-04-27 11:02:45 +00:00
description = ""
test_case_name = removesuffix(test_case.name, ".gen", ".sql") + ": "
2024-07-18 17:48:12 +00:00
if is_concurrent:
description = f"{test_case_name:72}"
else:
sys.stdout.flush()
sys.stdout.write(f"{test_case_name:72}")
# This flush is needed so you can see the test name of the long
# running test before it will finish. But don't do it in parallel
# mode, so that the lines don't mix.
sys.stdout.flush()
2021-08-06 14:38:28 +00:00
while True:
# This is the upper level timeout
2024-07-12 01:06:07 +00:00
# It helps with completely frozen processes, like in case of gdb errors
def timeout_handler(signum, frame):
raise TimeoutError("Test execution timed out")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(int(args.timeout * 1.1))
test_result = None
try:
test_result = test_case.run(
args, test_suite, client_options, server_logs_level
)
test_result = test_case.process_result(test_result, MESSAGES)
break
except TimeoutError:
break
finally:
signal.alarm(0)
if not test_result or not test_result.need_retry:
break
restarted_tests.append(test_result)
# First print the description, than invoke the check result logic
description += test_result.description
if description and not description.endswith("\n"):
description += "\n"
sys.stdout.write(description)
sys.stdout.flush()
if test_result.status == TestStatus.OK:
passed_total += 1
failures_chain = 0
elif test_result.status == TestStatus.FAIL:
failures_total += 1
failures_chain += 1
if test_result.reason == FailureReason.SERVER_DIED:
2024-07-10 10:08:33 +00:00
stop_tests()
2021-08-06 14:38:28 +00:00
server_died.set()
2024-07-10 10:08:33 +00:00
raise ServerDied("Server died")
elif test_result.status == TestStatus.SKIPPED:
skipped_total += 1
2021-08-06 14:38:28 +00:00
except KeyboardInterrupt as e:
print(colored("Break tests execution", args, "red"))
2024-07-03 15:53:05 +00:00
stop_tests()
raise e
if failures_chain >= args.max_failures_chain:
2024-07-03 15:53:05 +00:00
stop_tests()
raise ServerDied("Max failures chain")
if failures_total > 0:
2022-04-27 11:02:45 +00:00
print(
colored(
f"\nHaving {failures_total} errors! {passed_total} tests passed."
2022-04-28 11:26:49 +00:00
f" {skipped_total} tests skipped."
f" {(datetime.now() - start_time).total_seconds():.2f} s elapsed"
2022-04-27 11:02:45 +00:00
f" ({multiprocessing.current_process().name}).",
args,
"red",
attrs=["bold"],
)
)
exit_code.value = 1
else:
2022-04-27 11:02:45 +00:00
print(
colored(
f"\n{passed_total} tests passed. {skipped_total} tests skipped."
f" {(datetime.now() - start_time).total_seconds():.2f} s elapsed"
f" ({multiprocessing.current_process().name}).",
args,
"green",
attrs=["bold"],
)
)
2019-04-09 13:17:36 +00:00
2021-03-29 22:41:07 +00:00
sys.stdout.flush()
server_logs_level = "warning"
def check_server_started(args):
2022-04-27 11:02:45 +00:00
print("Connecting to ClickHouse server...", end="")
sys.stdout.flush()
retry_count = args.server_check_retries
query = "SELECT version(), arrayStringConcat(groupArray(value), ' ') FROM system.build_options WHERE name IN ('GIT_HASH', 'GIT_BRANCH')"
while retry_count > 0:
try:
res = (
str(clickhouse_execute(args, query).decode())
.strip()
.replace("\t", " @ ")
)
print(" OK")
print(f"Connected to server {res}")
sys.stdout.flush()
return True
2023-01-10 19:19:58 +00:00
except (ConnectionError, http.client.ImproperConnectionState) as e:
if args.hung_check:
print("Connection error, will retry: ", str(e))
else:
print(".", end="")
sys.stdout.flush()
retry_count -= 1
sleep(0.5)
continue
2022-04-12 09:55:57 +00:00
except TimeoutError:
print("\nConnection timeout, will not retry")
break
2022-04-13 11:37:44 +00:00
except Exception as e:
print(
"\nUexpected exception, will not retry: ",
type(e).__name__,
": ",
str(e),
)
2022-04-13 11:37:44 +00:00
break
2022-04-27 11:02:45 +00:00
print("\nAll connection tries failed")
sys.stdout.flush()
return False
2022-04-27 11:02:45 +00:00
class BuildFlags:
THREAD = "tsan"
ADDRESS = "asan"
UNDEFINED = "ubsan"
MEMORY = "msan"
DEBUG = "debug"
2023-10-29 16:21:45 +00:00
SANITIZE_COVERAGE = "sanitize-coverage"
2022-04-27 11:02:45 +00:00
RELEASE = "release"
ORDINARY_DATABASE = "ordinary-database"
POLYMORPHIC_PARTS = "polymorphic-parts"
2020-07-03 10:57:16 +00:00
def collect_build_flags(args):
result = []
2020-10-07 18:53:34 +00:00
2022-04-27 11:02:45 +00:00
value = clickhouse_execute(
args, "SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'"
)
if b"-fsanitize=thread" in value:
result.append(BuildFlags.THREAD)
2022-04-27 11:02:45 +00:00
elif b"-fsanitize=address" in value:
result.append(BuildFlags.ADDRESS)
2022-04-27 11:02:45 +00:00
elif b"-fsanitize=undefined" in value:
result.append(BuildFlags.UNDEFINED)
2022-04-27 11:02:45 +00:00
elif b"-fsanitize=memory" in value:
result.append(BuildFlags.MEMORY)
2023-10-29 16:21:45 +00:00
elif b"-DSANITIZE_COVERAGE=1" in value:
result.append(BuildFlags.SANITIZE_COVERAGE)
2022-04-27 11:02:45 +00:00
value = clickhouse_execute(
args, "SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'"
)
if b"Debug" in value:
result.append(BuildFlags.DEBUG)
2022-04-27 11:02:45 +00:00
elif b"RelWithDebInfo" in value or b"Release" in value:
result.append(BuildFlags.RELEASE)
2022-04-27 11:02:45 +00:00
value = clickhouse_execute(
2022-07-08 19:27:16 +00:00
args,
"SELECT value FROM system.settings WHERE name = 'allow_deprecated_database_ordinary'",
2022-04-27 11:02:45 +00:00
)
2022-08-12 13:40:35 +00:00
if value == b"1" or args.db_engine == "Ordinary":
result.append(BuildFlags.ORDINARY_DATABASE)
2022-04-27 11:02:45 +00:00
value = int(
clickhouse_execute(
args,
"SELECT value FROM system.merge_tree_settings WHERE name = 'min_bytes_for_wide_part'",
)
)
if value == 0:
result.append(BuildFlags.POLYMORPHIC_PARTS)
2020-10-07 18:53:34 +00:00
2022-04-27 11:02:45 +00:00
use_flags = clickhouse_execute(
args,
"SELECT name FROM system.build_options WHERE name like 'USE_%' AND value in ('ON', '1')",
)
for use_flag in use_flags.strip().splitlines():
use_flag = use_flag.decode().lower()
result.append(use_flag)
2022-04-27 11:02:45 +00:00
system_processor = clickhouse_execute(
args,
"SELECT value FROM system.build_options WHERE name = 'SYSTEM_PROCESSOR' LIMIT 1",
).strip()
if system_processor:
2022-04-27 11:02:45 +00:00
result.append(f"cpu-{system_processor.decode().lower()}")
2020-07-03 10:57:16 +00:00
return result
2022-04-27 11:02:45 +00:00
2022-07-07 22:16:01 +00:00
def collect_changed_merge_tree_settings(args):
changed_settings = (
clickhouse_execute(
args,
"SELECT name FROM system.merge_tree_settings WHERE changed",
)
.strip()
.splitlines()
)
return list(map(lambda s: s.decode(), changed_settings))
def check_table_column(args, database, table, column):
2022-04-27 11:02:45 +00:00
return (
int(
clickhouse_execute(
args,
f"""
SELECT count()
FROM system.columns
WHERE database = '{database}' AND table = '{table}' AND name = '{column}'
2022-04-27 11:02:45 +00:00
""",
)
)
> 0
)
2020-07-03 10:57:16 +00:00
2022-04-28 11:26:49 +00:00
def suite_key_func(item: str) -> Union[float, Tuple[int, str]]:
2022-04-27 11:02:45 +00:00
if args.order == "random":
return random.random()
2022-04-27 11:02:45 +00:00
if -1 == item.find("_"):
return 99998, ""
2022-04-27 11:02:45 +00:00
prefix, suffix = item.split("_", 1)
try:
return int(prefix), suffix
except ValueError:
2022-04-27 11:02:45 +00:00
return 99997, ""
def extract_key(key: str) -> str:
return subprocess.getstatusoutput(
2022-04-27 11:02:45 +00:00
args.extract_from_config + " --try --config " + args.configserver + key
)[1]
def run_tests_process(*args, **kwargs):
return run_tests_array(*args, **kwargs)
def do_run_tests(jobs, test_suite: TestSuite):
if jobs > 1 and len(test_suite.parallel_tests) > 0:
2022-04-27 11:02:45 +00:00
print(
"Found",
len(test_suite.parallel_tests),
"parallel tests and",
len(test_suite.sequential_tests),
"sequential tests",
)
tests_n = len(test_suite.parallel_tests)
2022-04-28 11:26:49 +00:00
jobs = min(jobs, tests_n)
2021-05-20 18:11:12 +00:00
2024-05-30 17:45:27 +00:00
# If we don't do random shuffling then there will be always
# nearly the same groups of test suites running concurrently.
2024-05-30 17:45:27 +00:00
# Thus, if there is a test within group which appears to be broken
# then it will affect all other tests in a non-random form.
# So each time a bad test fails - other tests from the group will also fail
# and this process will be more or less stable.
# It makes it more difficult to detect real flaky tests,
# because the distribution and the amount
# of failures will be nearly the same for all tests from the group.
random.shuffle(test_suite.parallel_tests)
batch_size = len(test_suite.parallel_tests) // jobs
manager = multiprocessing.Manager()
parallel_tests = manager.list()
parallel_tests.extend(test_suite.parallel_tests)
2021-05-20 18:11:12 +00:00
processes = []
for job in range(jobs):
process = multiprocessing.Process(
target=run_tests_process,
args=((parallel_tests, batch_size, test_suite, True),),
)
processes.append(process)
process.start()
while processes:
sys.stdout.flush()
# Periodically check the server for hangs
# and stop all processes in this case
try:
clickhouse_execute(
args,
query="SELECT 1 /*hang up check*/",
max_http_retries=5,
timeout=20,
)
except Exception:
print("Hang up check failed")
server_died.set()
if server_died.is_set():
print("Server died, terminating all processes...")
kill_gdb_if_any()
# Wait for test results
sleep(args.timeout)
for p in processes:
if p.is_alive():
p.terminate()
break
for p in processes[:]:
if not p.is_alive():
processes.remove(p)
sleep(5)
2021-05-20 18:11:12 +00:00
2024-07-18 17:48:12 +00:00
run_tests_array(
(
test_suite.sequential_tests,
len(test_suite.sequential_tests),
test_suite,
False,
)
2024-07-18 17:48:12 +00:00
)
return len(test_suite.sequential_tests) + len(test_suite.parallel_tests)
2021-05-20 18:11:12 +00:00
else:
num_tests = len(test_suite.all_tests)
run_tests_array(
(
test_suite.all_tests,
num_tests,
test_suite,
False,
)
)
2021-05-20 19:57:06 +00:00
return num_tests
2021-05-20 18:11:12 +00:00
def is_test_from_dir(suite_dir, case):
case_file = os.path.join(suite_dir, case)
# We could also test for executable files (os.access(case_file, os.X_OK),
# but it interferes with 01610_client_spawn_editor.editor, which is invoked
# as a query editor in the test, and must be marked as executable.
2022-04-27 11:02:45 +00:00
return os.path.isfile(case_file) and any(
case_file.endswith(suppotred_ext) for suppotred_ext in TEST_FILE_EXTENSIONS
)
def removesuffix(text, *suffixes):
"""
Added in python 3.9
https://www.python.org/dev/peps/pep-0616/
This version can work with several possible suffixes
"""
for suffix in suffixes:
if suffix and text.endswith(suffix):
2022-04-27 11:02:45 +00:00
return text[: -len(suffix)]
return text
def reportCoverageFor(args, what, query, permissive=False):
2022-08-27 01:13:53 +00:00
value = clickhouse_execute(args, query).decode()
2022-08-26 21:57:14 +00:00
if value != "":
print(f"\nThe following {what} were not covered by tests:\n")
2022-08-27 01:13:53 +00:00
print(value)
2022-08-26 21:57:14 +00:00
print("\n")
2022-08-27 16:11:16 +00:00
return permissive
return True
2022-08-26 21:57:14 +00:00
2023-10-29 16:21:45 +00:00
# This is high-level coverage on per-component basis (functions, data types, etc.)
# Don't be confused with the code coverage.
2022-08-26 21:57:14 +00:00
def reportCoverage(args):
clickhouse_execute(args, "SYSTEM FLUSH LOGS")
return (
reportCoverageFor(
args,
"functions",
"""
2022-08-26 21:57:14 +00:00
SELECT name
FROM system.functions
WHERE NOT is_aggregate AND origin = 'System' AND alias_to = ''
AND name NOT IN
(
SELECT arrayJoin(used_functions) FROM system.query_log WHERE event_date >= yesterday()
)
ORDER BY name
2022-08-27 16:11:16 +00:00
""",
True,
)
and reportCoverageFor(
args,
"aggregate functions",
"""
2022-08-26 21:57:14 +00:00
SELECT name
FROM system.functions
WHERE is_aggregate AND origin = 'System' AND alias_to = ''
AND name NOT IN
(
SELECT arrayJoin(used_aggregate_functions) FROM system.query_log WHERE event_date >= yesterday()
)
ORDER BY name
""",
)
and reportCoverageFor(
args,
"aggregate function combinators",
"""
2022-08-26 21:57:14 +00:00
SELECT name
FROM system.aggregate_function_combinators
WHERE NOT is_internal
AND name NOT IN
(
SELECT arrayJoin(used_aggregate_function_combinators) FROM system.query_log WHERE event_date >= yesterday()
)
ORDER BY name
""",
)
and reportCoverageFor(
args,
"data type families",
"""
2022-08-26 21:57:14 +00:00
SELECT name
FROM system.data_type_families
2022-08-27 01:13:53 +00:00
WHERE alias_to = '' AND name NOT LIKE 'Interval%'
2022-08-26 21:57:14 +00:00
AND name NOT IN
(
SELECT arrayJoin(used_data_type_families) FROM system.query_log WHERE event_date >= yesterday()
)
ORDER BY name
""",
)
2022-08-26 21:57:14 +00:00
)
2023-01-13 19:34:31 +00:00
def reportLogStats(args):
clickhouse_execute(args, "SYSTEM FLUSH LOGS")
2022-12-23 14:48:26 +00:00
query = """
WITH
240 AS mins,
2022-12-23 14:48:26 +00:00
(
SELECT (count(), sum(length(toValidUTF8(message))))
2022-12-23 14:48:26 +00:00
FROM system.text_log
WHERE (now() - toIntervalMinute(mins)) < event_time
) AS total
SELECT
count() AS count,
round(count / (total.1), 3) AS `count_%`,
formatReadableSize(sum(length(toValidUTF8(message)))) AS size,
round(sum(length(toValidUTF8(message))) / (total.2), 3) AS `size_%`,
2022-12-23 14:48:26 +00:00
countDistinct(logger_name) AS uniq_loggers,
countDistinct(thread_id) AS uniq_threads,
groupArrayDistinct(toString(level)) AS levels,
round(sum(query_id = '') / count, 3) AS `background_%`,
message_format_string
FROM system.text_log
WHERE (now() - toIntervalMinute(mins)) < event_time
GROUP BY message_format_string
ORDER BY count DESC
LIMIT 100
FORMAT PrettySpaceNoEscapes
2022-12-23 14:48:26 +00:00
"""
value = clickhouse_execute(args, query).decode(errors="replace")
2022-12-23 19:40:19 +00:00
print("\nTop patterns of log messages:\n")
2022-12-23 14:48:26 +00:00
print(value)
print("\n")
query = """
WITH
240 AS mins
2022-12-23 14:48:26 +00:00
SELECT
count() AS count,
substr(replaceRegexpAll(toValidUTF8(message), '[^A-Za-z]+', ''), 1, 32) AS pattern,
substr(any(toValidUTF8(message)), 1, 256) as runtime_message,
any((extract(source_file, '/[a-zA-Z0-9_]+\\.[a-z]+'), source_line)) as line
2022-12-23 14:48:26 +00:00
FROM system.text_log
WHERE (now() - toIntervalMinute(mins)) < event_time AND message_format_string = ''
GROUP BY pattern
ORDER BY count DESC
2023-01-25 20:16:42 +00:00
LIMIT 30
FORMAT PrettySpaceNoEscapes
2022-12-23 14:48:26 +00:00
"""
value = clickhouse_execute(args, query).decode(errors="replace")
2023-01-13 19:34:31 +00:00
print("\nTop messages without format string (fmt::runtime):\n")
2022-12-23 14:48:26 +00:00
print(value)
print("\n")
2023-01-25 11:53:00 +00:00
query = """
SELECT message_format_string, count(), any(toValidUTF8(message)) AS any_message
2023-01-25 11:53:00 +00:00
FROM system.text_log
WHERE (now() - toIntervalMinute(240)) < event_time
2023-01-25 11:53:00 +00:00
AND (message NOT LIKE (replaceRegexpAll(message_format_string, '{[:.0-9dfx]*}', '%') AS s))
AND (message NOT LIKE concat('%Exception: ', s, '%'))
GROUP BY message_format_string ORDER BY count() DESC LIMIT 20 FORMAT PrettySpaceNoEscapes
2023-01-25 11:53:00 +00:00
"""
value = clickhouse_execute(args, query).decode(errors="replace")
print("\nTop messages not matching their format strings:\n")
2023-01-25 11:53:00 +00:00
print(value)
print("\n")
2023-01-25 16:33:34 +00:00
query = """
WITH ('', '({}) Keys: {}', '({}) {}', 'Aggregating', 'Became leader', 'Cleaning queue',
'Creating set.', 'Cyclic aliases', 'Detaching {}', 'Executing {}', 'Fire events: {}',
'Found part {}', 'Loaded queue', 'No sharding key', 'No tables', 'Query: {}',
'Removed', 'Removed part {}', 'Removing parts.', 'Request URI: {}', 'Sending part {}',
'Sent handshake', 'Starting {}', 'Will mimic {}', 'Writing to {}', 'dropIfEmpty',
'loadAll {}', '{} ({}:{})', '{} -> {}', '{} {}', '{}: {}', '{}%', 'Read object: {}',
'New segment: {}', 'Convert overflow', 'Division by zero', 'Files set to {}',
'Bytes set to {}', 'Numeric overflow', 'Invalid mode: {}',
'Write file: {}', 'Unable to parse JSONPath', 'Host is empty in S3 URI.', 'Expected end of line',
'inflate failed: {}{}', 'Center is not valid', 'Column ''{}'' is ambiguous', 'Cannot parse object', 'Invalid date: {}',
'There is no cache by name: {}', 'No part {} in table', '`{}` should be a String', 'There are duplicate id {}',
'Invalid replica name: {}', 'Unexpected value {} in enum', 'Unknown BSON type: {}', 'Point is not valid',
'Invalid qualified name: {}', 'INTO OUTFILE is not allowed', 'Arguments must not be NaN', 'Cell is not valid',
'brotli decode error{}', 'Invalid H3 index: {}', 'Too large node state size', 'No additional keys found.',
'Attempt to read after EOF.', 'Replication was stopped', '{} building file infos', 'Cannot parse uuid {}',
'Query was cancelled', 'Cancelled merging parts', 'Cancelled mutating parts', 'Log pulling is cancelled',
'Transaction was cancelled', 'Could not find table: {}', 'Table {} doesn''t exist',
'Database {} doesn''t exist', 'Dictionary ({}) not found', 'Unknown table function {}',
'Unknown format {}', 'Unknown explain kind ''{}''', 'Unknown setting {}', 'Unknown input format {}',
'Unknown identifier: ''{}''', 'User name is empty', 'Expected function, got: {}',
'Attempt to read after eof', 'String size is too big ({}), maximum: {}'
2023-01-25 16:33:34 +00:00
) AS known_short_messages
SELECT count() AS c, message_format_string, substr(any(toValidUTF8(message)), 1, 120),
min(if(length(regexpExtract(toValidUTF8(message), '(.*)\\([A-Z0-9_]+\\)')) as prefix_len > 0, prefix_len, length(toValidUTF8(message))) - 26 AS length_without_exception_boilerplate) AS min_length_without_exception_boilerplate
2023-01-25 16:33:34 +00:00
FROM system.text_log
WHERE (now() - toIntervalMinute(240)) < event_time
2023-01-25 16:33:34 +00:00
AND (length(message_format_string) < 16
OR (message ILIKE '%DB::Exception%' AND length_without_exception_boilerplate < 30))
2023-01-25 16:33:34 +00:00
AND message_format_string NOT IN known_short_messages
GROUP BY message_format_string ORDER BY c DESC LIMIT 50 FORMAT PrettySpaceNoEscapes
2023-01-25 16:33:34 +00:00
"""
value = clickhouse_execute(args, query).decode(errors="replace")
print("\nTop short messages:\n")
print(value)
print("\n")
2023-01-26 09:52:47 +00:00
query = """
SELECT max((freq, message_format_string)), level
FROM (SELECT count() / (SELECT count() FROM system.text_log
WHERE (now() - toIntervalMinute(240)) < event_time) AS freq,
2023-01-26 09:52:47 +00:00
min(level) AS level, message_format_string FROM system.text_log
WHERE (now() - toIntervalMinute(120)) < event_time
GROUP BY message_format_string ORDER BY freq DESC)
GROUP BY level
"""
value = clickhouse_execute(args, query).decode(errors="replace")
print("\nTop messages by level:\n")
print(value)
print("\n")
2022-08-26 21:57:14 +00:00
def try_get_skip_list(base_dir, name):
test_names_to_skip = []
skip_list_path = os.path.join(base_dir, name)
if not os.path.exists(skip_list_path):
return test_names_to_skip
with open(skip_list_path, "r", encoding="utf-8") as fd:
for line in fd.read().split("\n"):
if line == "" or line[0] == " ":
continue
test_name = line.split()[0].strip()
if test_name != "":
test_names_to_skip.append(test_name)
return test_names_to_skip
def main(args):
global server_died
2020-08-26 17:44:03 +00:00
global stop_time
global exit_code
global server_logs_level
global restarted_tests
if not check_server_started(args):
2023-01-02 01:06:11 +00:00
msg = "Server is not responding. Cannot execute 'SELECT 1' query."
2021-09-18 22:05:17 +00:00
if args.hung_check:
print(msg)
pid = get_server_pid()
print("Got server pid", pid)
print_stacktraces()
2024-02-26 17:46:15 +00:00
raise TestException(msg)
2020-10-12 11:17:35 +00:00
args.build_flags = collect_build_flags(args)
2022-07-07 22:16:01 +00:00
args.changed_merge_tree_settings = collect_changed_merge_tree_settings(args)
2020-07-03 11:15:30 +00:00
if args.s3_storage and (BuildFlags.RELEASE not in args.build_flags):
2022-09-20 10:25:51 +00:00
args.no_random_settings = True
2020-07-03 10:57:16 +00:00
if args.skip:
args.skip = set(args.skip)
if args.replace_replicated_with_shared:
if not args.skip:
args.skip = set([])
args.skip = set(args.skip)
base_dir = os.path.abspath(args.queries)
# Keep same default values as in queries/shell_config.sh
os.environ.setdefault("CLICKHOUSE_BINARY", args.binary)
2021-08-06 14:38:28 +00:00
# os.environ.setdefault("CLICKHOUSE_CLIENT", args.client)
os.environ.setdefault("CLICKHOUSE_CONFIG", args.configserver)
2018-06-18 21:13:11 +00:00
if args.configclient:
os.environ.setdefault("CLICKHOUSE_CONFIG_CLIENT", args.configclient)
# Force to print server warnings in stderr
# Shell scripts could change logging level
os.environ.setdefault("CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL", server_logs_level)
2020-08-26 17:44:03 +00:00
# This code is bad as the time is not monotonic
if args.global_time_limit:
stop_time = time() + args.global_time_limit
if args.zookeeper is None:
args.zookeeper = True
if args.shard is None:
args.shard = bool(extract_key(' --key listen_host | grep -E "127.0.0.2|::"'))
2021-06-22 11:50:09 +00:00
def create_common_database(args, db_name):
create_database_retries = 0
while create_database_retries < MAX_RETRIES:
start_time = datetime.now()
try:
2022-04-27 11:02:45 +00:00
clickhouse_execute(
args,
2022-04-28 11:26:49 +00:00
f"CREATE DATABASE IF NOT EXISTS {db_name} "
f"{get_db_engine(args, db_name)}",
2022-08-12 09:28:16 +00:00
settings=get_create_database_settings(args, None),
2022-04-27 11:02:45 +00:00
)
break
except HTTPError as e:
total_time = (datetime.now() - start_time).total_seconds()
if not need_retry(args, e.message, e.message, total_time):
break
2021-06-22 11:50:09 +00:00
create_database_retries += 1
2022-04-28 14:29:53 +00:00
try:
if args.database and args.database != "test":
create_common_database(args, args.database)
2022-04-28 14:29:53 +00:00
create_common_database(args, "test")
except Exception as e:
print(f"Failed to create databases for tests: {e}")
server_died.set()
2023-11-04 21:04:39 +00:00
if (
args.collect_per_test_coverage
and BuildFlags.SANITIZE_COVERAGE in args.build_flags
):
clickhouse_execute(
args,
"""
CREATE TABLE IF NOT EXISTS system.coverage_log
(
2023-10-30 00:04:50 +00:00
time DateTime,
test_name String,
coverage Array(UInt64)
2024-02-12 15:27:20 +00:00
) ENGINE = MergeTree ORDER BY test_name
COMMENT 'Contains information about per-test coverage from the CI, but used only for exporting to the CI cluster';
""",
)
2023-10-29 22:43:36 +00:00
# Coverage collected at the system startup before running any tests:
clickhouse_execute(
args,
"INSERT INTO system.coverage_log SELECT now(), '', coverageCurrent()",
2023-10-29 22:43:36 +00:00
)
2019-10-09 10:51:05 +00:00
total_tests_run = 0
cloud_skip_list = try_get_skip_list(base_dir, "../queries-no-cloud-tests.txt")
private_skip_list = try_get_skip_list(base_dir, "../queries-no-private-tests.txt")
for suite in sorted(os.listdir(base_dir), key=suite_key_func):
if server_died.is_set():
break
test_suite = TestSuite.read_test_suite(args, suite)
if test_suite is None:
continue
test_suite.cloud_skip_list = cloud_skip_list
test_suite.private_skip_list = private_skip_list
total_tests_run += do_run_tests(args.jobs, test_suite)
2019-10-09 10:51:05 +00:00
if server_died.is_set():
exit_code.value = 1
if args.hung_check:
# Some queries may execute in background for some time after test was finished. This is normal.
2023-12-18 19:50:58 +00:00
print("Checking the hung queries: ", end="")
2023-12-18 18:16:50 +00:00
hung_count = 0
try:
deadline = datetime.now() + timedelta(seconds=90)
while datetime.now() < deadline:
hung_count = get_processlist_size(args)
if hung_count == 0:
print(" done")
break
2023-12-18 19:50:58 +00:00
print(". ", end="")
2023-12-18 18:16:50 +00:00
except Exception as e:
print(
colored(
2023-12-18 19:50:58 +00:00
"\nHung check failed. Failed to get processlist size: " + str(e),
args,
"red",
attrs=["bold"],
2023-12-18 18:16:50 +00:00
)
)
exit_code.value = 1
processlist = ""
if hung_count > 0:
try:
processlist = get_processlist_with_stacktraces(args)
except Exception as e:
print(
colored(
2023-12-18 19:50:58 +00:00
"\nHung check failed. Failed to get processlist with stacktraces: "
+ str(e),
args,
"red",
attrs=["bold"],
2023-12-18 18:16:50 +00:00
)
)
exit_code.value = 1
if processlist:
2022-04-27 11:02:45 +00:00
print(
colored(
"\nFound hung queries in processlist:", args, "red", attrs=["bold"]
)
)
2023-05-09 21:22:55 +00:00
print(processlist.decode())
2022-03-14 20:43:34 +00:00
print(get_transactions_list(args))
2021-02-19 14:38:20 +00:00
2023-02-23 17:20:29 +00:00
print_stacktraces()
exit_code.value = 1
else:
print(colored("\nNo queries hung.", args, "green", attrs=["bold"]))
2024-07-09 21:59:53 +00:00
if args.client_log:
if os.path.exists(args.client_log):
with open(args.client_log, "rb") as stream:
content = stream.read().decode()
if len(content):
print("Has fatal logs from client:\n")
print(content)
os.remove(args.client_log)
2021-06-16 10:26:04 +00:00
if len(restarted_tests) > 0:
2021-06-15 20:52:29 +00:00
print("\nSome tests were restarted:\n")
for test_result in restarted_tests:
2022-04-28 11:26:49 +00:00
print(f"\n{test_result.case_name:72}: ")
# replace it with lowercase to avoid parsing retried tests as failed
for status in TestStatus:
2022-04-27 11:02:45 +00:00
test_result.description = test_result.description.replace(
status.value, status.value.lower()
)
print(test_result.description)
2021-06-15 20:52:29 +00:00
2019-10-09 10:51:05 +00:00
if total_tests_run == 0:
print("No tests were run.")
sys.exit(1)
2021-08-06 14:38:28 +00:00
else:
print("All tests have finished.")
2019-10-09 10:51:05 +00:00
2022-12-23 14:48:26 +00:00
if args.report_logs_stats:
2023-01-25 15:06:40 +00:00
try:
reportLogStats(args)
except Exception as e:
print(f"Failed to get stats about log messages: {e}")
2022-12-23 14:48:26 +00:00
2022-08-27 16:11:16 +00:00
if args.report_coverage and not reportCoverage(args):
exit_code.value = 1
sys.exit(exit_code.value)
2019-01-24 11:02:55 +00:00
def find_binary(name):
if os.access(name, os.X_OK):
return name
2022-04-27 11:02:45 +00:00
paths = os.environ.get("PATH").split(":")
2019-01-24 11:02:55 +00:00
for path in paths:
bin_path = os.path.join(path, name)
if os.access(bin_path, os.X_OK):
return bin_path
2019-01-24 11:02:55 +00:00
# maybe it wasn't in PATH
bin_path = os.path.join("/usr/local/bin", name)
if os.access(bin_path, os.X_OK):
return bin_path
bin_path = os.path.join("/usr/bin", name)
if os.access(bin_path, os.X_OK):
return bin_path
2019-06-17 16:50:31 +00:00
2024-02-26 17:46:15 +00:00
raise TestException(f"{name} was not found in PATH")
2019-06-17 16:50:31 +00:00
def find_clickhouse_command(binary, command):
symlink = binary + "-" + command
if os.access(symlink, os.X_OK):
return symlink
# To avoid requiring symlinks (in case you download binary from CI)
return binary + " " + command
2019-06-17 16:50:31 +00:00
def get_additional_client_options(args):
if args.client_option:
client_options = " ".join("--" + option for option in args.client_option)
if "CLICKHOUSE_CLIENT_OPT" in os.environ:
return os.environ["CLICKHOUSE_CLIENT_OPT"] + " " + client_options
else:
return client_options
else:
if "CLICKHOUSE_CLIENT_OPT" in os.environ:
return os.environ["CLICKHOUSE_CLIENT_OPT"]
2022-04-27 11:02:45 +00:00
return ""
def get_additional_client_options_url(args):
if args.client_option:
2022-04-27 11:02:45 +00:00
return "&".join(args.client_option)
return ""
def parse_args():
2022-04-27 11:02:45 +00:00
parser = ArgumentParser(description="ClickHouse functional tests")
parser.add_argument("-q", "--queries", help="Path to queries dir")
parser.add_argument("--tmp", help="Path to tmp dir")
parser.add_argument(
"-b",
"--binary",
default="clickhouse",
type=find_binary,
help="Path to clickhouse binary or name of binary in PATH",
2022-04-27 11:02:45 +00:00
)
parser.add_argument(
"-c",
"--client",
2023-01-02 01:06:11 +00:00
help="Path to clickhouse-client, this option is useless"
2022-04-28 11:26:49 +00:00
"name of binary in PATH",
2022-04-27 11:02:45 +00:00
)
parser.add_argument("--extract_from_config", help="extract-from-config program")
parser.add_argument(
2024-05-11 19:46:42 +00:00
"--configclient", help="Client config (if you do not use default ports)"
2022-04-27 11:02:45 +00:00
)
parser.add_argument(
"--configserver",
default="/etc/clickhouse-server/config.xml",
help="Preprocessed server config",
)
parser.add_argument(
"-o", "--output", help="Output xUnit compliant test report directory"
)
parser.add_argument(
"-t",
"--timeout",
type=int,
default=600,
help="Timeout for each test case in seconds",
)
parser.add_argument(
"--global_time_limit",
type=int,
2024-05-11 19:46:42 +00:00
help="Stop if executing more than specified time (after current test is finished)",
2022-04-27 11:02:45 +00:00
)
parser.add_argument("test", nargs="*", help="Optional test case name regex")
parser.add_argument(
"-d",
"--disabled",
action="store_true",
default=False,
help="Also run disabled tests",
)
parser.add_argument(
"--stop",
action="store_true",
default=None,
dest="stop",
help="Stop on network errors",
)
parser.add_argument(
"--order", default="desc", choices=["asc", "desc", "random"], help="Run order"
)
parser.add_argument(
"--testname",
action="store_true",
default=None,
dest="testname",
help="Make query with test name before test run",
)
parser.add_argument("--hung-check", action="store_true", default=False)
parser.add_argument("--no-left-queries-check", action="store_true", default=False)
parser.add_argument("--force-color", action="store_true", default=False)
parser.add_argument(
"--database", help="Database for tests (random name test_XXXXXX by default)"
)
parser.add_argument(
"--no-drop-if-fail",
action="store_true",
help="Do not drop database for test if test has failed",
2022-04-27 11:02:45 +00:00
)
parser.add_argument(
"--hide-db-name",
action="store_true",
help='Replace random database name with "default" in stderr',
)
parser.add_argument(
"--parallel", default="1/1", help="One parallel test run number/total"
)
parser.add_argument(
"-j", "--jobs", default=1, nargs="?", type=int, help="Run all tests in parallel"
)
parser.add_argument(
"--test-runs",
default=1,
nargs="?",
type=int,
help="Run each test many times (useful for e.g. flaky check)",
)
parser.add_argument(
"-U",
"--unified",
default=3,
type=int,
help="output NUM lines of unified context",
)
parser.add_argument(
"-r",
"--server-check-retries",
default=180,
type=int,
help="Num of tries to execute SELECT 1 before tests started",
)
parser.add_argument("--db-engine", help="Database engine name")
parser.add_argument(
"--replicated-database",
action="store_true",
default=False,
help="Run tests with Replicated database engine",
)
parser.add_argument(
"--fast-tests-only",
action="store_true",
default=False,
help='Run only fast tests (the tests without the "no-fasttest" tag)',
)
parser.add_argument(
"--no-stateless", action="store_true", help="Disable all stateless tests"
)
parser.add_argument(
"--no-stateful", action="store_true", help="Disable all stateful tests"
)
parser.add_argument("--skip", nargs="+", help="Skip these tests")
parser.add_argument(
"--sequential",
nargs="+",
help="Run these tests sequentially even if --parallel specified",
)
parser.add_argument(
"--no-long", action="store_true", dest="no_long", help="Do not run long tests"
)
parser.add_argument(
"--client-option", nargs="+", help="Specify additional client argument"
)
parser.add_argument(
"--print-time", action="store_true", dest="print_time", help="Print test time"
)
parser.add_argument(
"--check-zookeeper-session",
action="store_true",
help="Check ZooKeeper session uptime to determine if failed test should be retried",
)
parser.add_argument(
"--s3-storage",
action="store_true",
default=False,
help="Run tests over s3 storage",
)
2024-07-02 12:15:59 +00:00
parser.add_argument(
"--azure-blob-storage",
action="store_true",
default=False,
help="Run tests over azure blob storage",
)
parser.add_argument(
2022-04-27 11:02:45 +00:00
"--no-random-settings",
action="store_true",
default=False,
2022-04-27 11:02:45 +00:00
help="Disable settings randomization",
)
2022-04-27 11:02:45 +00:00
parser.add_argument(
"--no-random-merge-tree-settings",
2022-04-27 11:02:45 +00:00
action="store_true",
default=False,
help="Disable MergeTree settings randomization",
2022-04-27 11:02:45 +00:00
)
parser.add_argument(
"--run-by-hash-num",
type=int,
help="Run tests matching crc32(test_name) % run_by_hash_total == run_by_hash_num",
)
parser.add_argument(
"--run-by-hash-total",
type=int,
help="Total test groups for crc32(test_name) % run_by_hash_total == run_by_hash_num",
)
parser.add_argument(
"--show-whitespaces-in-diff",
action="store_true",
help="Display $ characters after line with trailing whitespaces in diff output",
)
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument(
"--cloud",
action="store_true",
default=None,
dest="cloud",
help="Run only tests that are supported in ClickHouse Cloud environment",
)
group.add_argument(
"--no-cloud",
action="store_false",
default=None,
dest="cloud",
help="Run all the tests, including the ones not supported in ClickHouse Cloud environment",
)
parser.set_defaults(cloud=False)
group = parser.add_mutually_exclusive_group(required=False)
group.add_argument(
"--private",
action="store_true",
default=None,
dest="private",
help="Run only tests that are supported in the private build",
)
group.add_argument(
"--no-private",
action="store_false",
default=None,
dest="private",
help="Run all the tests, including the ones not supported in the private build",
)
# Only used to skip tests via "../queries-no-private-tests.txt", so it's fine to keep it enabled by default
parser.set_defaults(private=True)
2021-08-06 14:38:28 +00:00
group = parser.add_mutually_exclusive_group(required=False)
2022-04-27 11:02:45 +00:00
group.add_argument(
"--zookeeper",
action="store_true",
default=None,
dest="zookeeper",
help="Run zookeeper related tests",
)
group.add_argument(
"--no-zookeeper",
action="store_false",
default=None,
dest="zookeeper",
help="Do not run zookeeper related tests",
)
2021-08-06 14:38:28 +00:00
group = parser.add_mutually_exclusive_group(required=False)
2022-04-27 11:02:45 +00:00
group.add_argument(
"--shard",
action="store_true",
default=None,
dest="shard",
2022-04-28 11:26:49 +00:00
help="Run sharding related tests "
"(required to clickhouse-server listen 127.0.0.2 127.0.0.3)",
2022-04-27 11:02:45 +00:00
)
group.add_argument(
"--no-shard",
action="store_false",
default=None,
dest="shard",
help="Do not run shard related tests",
)
2024-03-18 13:18:58 +00:00
# TODO: Remove upgrade-check option after release 24.3 and use
# ignore_drop_queries_probability option in stress.py as in stress tests
2022-04-27 11:02:45 +00:00
group.add_argument(
"--upgrade-check",
2022-04-27 11:02:45 +00:00
action="store_true",
help="Run tests for further server upgrade testing by ignoring all"
2022-04-27 11:02:45 +00:00
"drop queries in tests for collecting data from new version of server",
)
parser.add_argument(
"--secure",
action="store_true",
default=False,
help="Use secure connection to connect to clickhouse-server",
)
parser.add_argument(
"--max-failures-chain",
default=20,
type=int,
help="Max number of failed tests in a row (stop tests if higher)",
)
parser.add_argument(
"--report-coverage",
action="store_true",
default=False,
help="Check what high-level server components were covered by tests",
)
parser.add_argument(
"--collect-per-test-coverage",
action="store_true",
2024-01-14 21:06:25 +00:00
default=True,
help="Create `system.coverage_log` table on the server and collect information about low-level code coverage on a per test basis there",
)
parser.add_argument(
"--reset-coverage-before-every-test",
action="store_true",
2024-01-20 20:45:11 +00:00
default=True,
help="Collect isolated test coverage for every test instead of a cumulative. Useful only when tests are run sequentially.",
)
2022-12-23 14:48:26 +00:00
parser.add_argument(
"--report-logs-stats",
action="store_true",
default=False,
help="Report statistics about log messages",
)
2023-02-03 13:34:18 +00:00
parser.add_argument(
"--no-parallel-replicas",
action="store_true",
default=False,
help="Do not include tests that are not supported with parallel replicas feature",
)
parser.add_argument(
"--replace-replicated-with-shared",
action="store_true",
default=os.environ.get("REPLACE_RMT_WITH_SMT", False),
help="Replace ReplicatedMergeTree engine with SharedMergeTree",
)
parser.add_argument(
"--replace-non-replicated-with-shared",
action="store_true",
default=os.environ.get("REPLACE_MT_WITH_SMT", False),
help="Replace ordinary MergeTree engine with SharedMergeTree",
)
2024-07-09 21:59:53 +00:00
parser.add_argument(
"--client-log",
default="./client.fatal.log",
help="Path to file for fatal logs from client",
)
2023-02-03 13:34:18 +00:00
2023-02-15 21:19:41 +00:00
return parser.parse_args()
class Terminated(KeyboardInterrupt):
pass
def signal_handler(sig, frame):
raise Terminated(f"Terminated with {sig} signal")
if __name__ == "__main__":
stop_time = None
exit_code = multiprocessing.Value("i", 0)
server_died = multiprocessing.Event()
multiprocessing_manager = multiprocessing.Manager()
restarted_tests = multiprocessing_manager.list()
# Move to a new process group and kill it at exit so that we don't have any
# infinite tests processes left
# (new process group is required to avoid killing some parent processes)
os.setpgid(0, 0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
try:
args = parse_args()
except Exception as e:
print(e, file=sys.stderr)
sys.exit(1)
2020-08-13 18:45:55 +00:00
if args.queries and not os.path.isdir(args.queries):
2022-04-27 11:02:45 +00:00
print(
f"Cannot access the specified directory with queries ({args.queries})",
file=sys.stderr,
)
sys.exit(1)
2020-08-13 18:45:55 +00:00
# Autodetect the directory with queries if not specified
if args.queries is None:
2022-04-27 11:02:45 +00:00
args.queries = "queries"
2020-08-13 18:45:55 +00:00
if not os.path.isdir(args.queries):
# If we're running from the repo
2022-04-27 11:02:45 +00:00
args.queries = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "queries"
)
2020-08-13 18:45:55 +00:00
if not os.path.isdir(args.queries):
# Next we're going to try some system directories, don't write 'stdout' files into them.
if args.tmp is None:
2022-04-27 11:02:45 +00:00
args.tmp = "/tmp/clickhouse-test"
2020-08-13 18:45:55 +00:00
2022-04-27 11:02:45 +00:00
args.queries = "/usr/local/share/clickhouse-test/queries"
2020-08-13 18:45:55 +00:00
if not os.path.isdir(args.queries):
2022-04-27 11:02:45 +00:00
args.queries = "/usr/share/clickhouse-test/queries"
2020-08-13 18:45:55 +00:00
if not os.path.isdir(args.queries):
2022-04-27 11:02:45 +00:00
print(
2022-04-28 11:26:49 +00:00
"Failed to detect path to the queries directory. Please specify it with "
"'--queries' option.",
2022-04-27 11:02:45 +00:00
file=sys.stderr,
)
sys.exit(1)
2020-07-03 10:57:16 +00:00
2020-08-13 18:45:55 +00:00
print("Using queries from '" + args.queries + "' directory")
2018-01-16 20:17:31 +00:00
if args.tmp is None:
args.tmp = args.queries
2019-01-24 11:02:55 +00:00
2023-02-01 12:01:58 +00:00
if args.client:
print(
"WARNING: --client option is deprecated and will be removed the the future, use --binary instead",
file=sys.stderr,
)
args.client = find_clickhouse_command(args.binary, "client")
2023-02-01 12:01:58 +00:00
if args.extract_from_config:
print(
"WARNING: --extract_from_config option is deprecated and will be removed the the future",
file=sys.stderr,
)
args.extract_from_config = find_clickhouse_command(
args.binary, "extract-from-config"
)
2019-01-24 11:02:55 +00:00
if args.configclient:
2022-04-27 11:02:45 +00:00
args.client += " --config-file=" + args.configclient
tcp_host = os.getenv("CLICKHOUSE_HOST")
if tcp_host is not None:
args.tcp_host = tcp_host
2022-04-27 11:02:45 +00:00
args.client += f" --host={tcp_host}"
else:
2022-04-27 11:02:45 +00:00
args.tcp_host = "localhost"
tcp_port = os.getenv("CLICKHOUSE_PORT_TCP")
if tcp_port is not None:
args.tcp_port = int(tcp_port)
args.client += f" --port={tcp_port}"
else:
args.tcp_port = 9440 if args.secure else 9000
2022-05-09 12:55:51 +00:00
if args.secure:
os.environ["CLICKHOUSE_PORT_TCP"] = str(args.tcp_port)
http_port = os.getenv("CLICKHOUSE_PORT_HTTP")
if http_port is not None:
args.http_port = int(http_port)
else:
args.http_port = 8443 if args.secure else 8123
os.environ["CLICKHOUSE_PORT_HTTP"] = str(args.http_port)
if args.secure and os.getenv("CLICKHOUSE_PORT_HTTP_PROTO") is None:
os.environ["CLICKHOUSE_PORT_HTTP_PROTO"] = "https"
client_database = os.getenv("CLICKHOUSE_DATABASE")
if client_database is not None:
2022-04-27 11:02:45 +00:00
args.client += f" --database={client_database}"
args.client_database = client_database
else:
2022-04-27 11:02:45 +00:00
args.client_database = "default"
if args.upgrade_check:
2022-04-27 11:02:45 +00:00
args.client += " --fake-drop"
if args.client_option or args.secure:
# Set options for client
2022-04-27 11:02:45 +00:00
if "CLICKHOUSE_CLIENT_OPT" in os.environ:
os.environ["CLICKHOUSE_CLIENT_OPT"] += " "
else:
2022-04-27 11:02:45 +00:00
os.environ["CLICKHOUSE_CLIENT_OPT"] = ""
os.environ["CLICKHOUSE_CLIENT_OPT"] += get_additional_client_options(args)
if args.secure:
os.environ["CLICKHOUSE_CLIENT_OPT"] += " --secure "
# Set options for curl
2022-04-27 11:02:45 +00:00
if "CLICKHOUSE_URL_PARAMS" in os.environ:
os.environ["CLICKHOUSE_URL_PARAMS"] += "&"
else:
2022-04-27 11:02:45 +00:00
os.environ["CLICKHOUSE_URL_PARAMS"] = ""
client_options_query_str = get_additional_client_options_url(args)
2022-04-27 11:02:45 +00:00
args.client_options_query_str = client_options_query_str + "&"
args.client_options_query_str += os.environ["CLICKHOUSE_URL_PARAMS"]
2022-04-27 11:02:45 +00:00
os.environ["CLICKHOUSE_URL_PARAMS"] += client_options_query_str
else:
2022-04-27 11:02:45 +00:00
args.client_options_query_str = ""
2019-06-20 09:12:49 +00:00
if args.jobs is None:
args.jobs = multiprocessing.cpu_count()
2019-06-20 09:12:49 +00:00
if args.db_engine and args.db_engine == "Ordinary":
MESSAGES_TO_RETRY.append(" locking attempt on ")
if args.replace_replicated_with_shared:
args.s3_storage = True
main(args)