ClickHouse/tests/clickhouse-test

2698 lines
91 KiB
Plaintext
Raw Normal View History

2020-10-02 16:54:07 +00:00
#!/usr/bin/env python3
# pylint: disable=too-many-return-statements
# pylint: disable=global-variable-not-assigned
2022-04-28 11:26:49 +00:00
# pylint: disable=too-many-lines
2023-01-17 09:17:51 +00:00
# pylint: disable=anomalous-backslash-in-string
import enum
2022-06-27 20:54:52 +00:00
from queue import Full
import shutil
import sys
import os
import os.path
import signal
import re
import copy
import traceback
import math
2022-04-27 11:02:45 +00:00
# Not requests, to avoid requiring extra dependency.
import http.client
import urllib.parse
import json
2022-04-27 11:02:45 +00:00
# for crc32
import zlib
from argparse import ArgumentParser
from typing import Tuple, Union, Optional, Dict, Set, List
import subprocess
from subprocess import Popen
from subprocess import PIPE
2016-09-02 16:26:09 +00:00
from datetime import datetime
2020-08-26 17:44:03 +00:00
from time import time, sleep
2016-09-02 16:26:09 +00:00
from errno import ESRCH
try:
2022-04-28 11:26:49 +00:00
import termcolor # type: ignore
except ImportError:
termcolor = None
import random
import string
2019-06-03 17:36:27 +00:00
import multiprocessing
2021-08-02 13:51:33 +00:00
import socket
from contextlib import closing
USE_JINJA = True
try:
import jinja2
except ImportError:
USE_JINJA = False
2022-04-27 11:02:45 +00:00
print("WARNING: jinja2 not installed! Template tests will be skipped.")
2019-03-13 16:47:02 +00:00
MESSAGES_TO_RETRY = [
2020-05-29 00:46:42 +00:00
"ConnectionPoolWithFailover: Connection failed at try",
2021-06-15 20:52:29 +00:00
"DB::Exception: New table appeared in database being dropped or detached. Try again",
2021-07-06 13:36:18 +00:00
"is already started to be removing by another replica right now",
]
2019-03-13 16:47:02 +00:00
2021-06-21 17:29:32 +00:00
MAX_RETRIES = 3
2021-06-15 20:52:29 +00:00
2022-04-27 11:02:45 +00:00
TEST_FILE_EXTENSIONS = [".sql", ".sql.j2", ".sh", ".py", ".expect"]
2022-02-18 13:54:21 +00:00
VERSION_PATTERN = r"^((\d+\.)?(\d+\.)?(\d+\.)?\d+)$"
def stringhash(s):
# default hash() function consistent
# only during process invocation https://stackoverflow.com/a/42089311
2022-04-27 11:02:45 +00:00
return zlib.crc32(s.encode("utf-8"))
2023-02-06 19:27:46 +00:00
# First and last lines of the log
def trim_for_log(s):
if not s:
return s
lines = s.splitlines()
2023-02-27 23:07:50 +00:00
if len(lines) > 10000:
2023-03-23 15:33:23 +00:00
separator = "-" * 40 + str(len(lines) - 10000) + " lines are hidden" + "-" * 40
2023-02-27 23:07:50 +00:00
return "\n".join(lines[:5000] + [] + [separator] + [] + lines[-5000:])
else:
return "\n".join(lines)
class HTTPError(Exception):
def __init__(self, message=None, code=None):
self.message = message
self.code = code
super().__init__(message)
def __str__(self):
2022-04-28 11:26:49 +00:00
return f"Code: {self.code}. {self.message}"
2022-04-27 11:02:45 +00:00
# Helpers to execute queries via HTTP interface.
2022-04-27 11:02:45 +00:00
def clickhouse_execute_http(
2023-03-23 15:33:23 +00:00
base_args,
query,
timeout=30,
settings=None,
default_format=None,
max_http_retries=5,
retry_error_codes=False,
2022-04-27 11:02:45 +00:00
):
if args.secure:
client = http.client.HTTPSConnection(
host=base_args.tcp_host, port=base_args.http_port, timeout=timeout
)
else:
client = http.client.HTTPConnection(
host=base_args.tcp_host, port=base_args.http_port, timeout=timeout
)
timeout = int(timeout)
params = {
2022-04-27 11:02:45 +00:00
"query": query,
# hung check in stress tests may remove the database,
# hence we should use 'system'.
2022-04-27 11:02:45 +00:00
"database": "system",
"connect_timeout": timeout,
"receive_timeout": timeout,
"send_timeout": timeout,
"http_connection_timeout": timeout,
"http_receive_timeout": timeout,
"http_send_timeout": timeout,
"output_format_parallel_formatting": 0,
}
if settings is not None:
params.update(settings)
if default_format is not None:
2022-04-27 11:02:45 +00:00
params["default_format"] = default_format
for i in range(max_http_retries):
try:
client.request(
"POST",
f"/?{base_args.client_options_query_str}{urllib.parse.urlencode(params)}",
)
res = client.getresponse()
data = res.read()
if res.status == 200 or (not retry_error_codes):
break
2022-12-28 22:18:58 +00:00
except Exception as ex:
if i == max_http_retries - 1:
raise ex
2023-04-03 12:15:18 +00:00
client.close()
2022-12-28 22:18:58 +00:00
sleep(i + 1)
if res.status != 200:
raise HTTPError(data.decode(), res.status)
return data
2023-03-23 15:33:23 +00:00
def clickhouse_execute(
base_args,
query,
timeout=30,
settings=None,
max_http_retries=5,
retry_error_codes=False,
):
return clickhouse_execute_http(
base_args,
query,
timeout,
settings,
max_http_retries=max_http_retries,
retry_error_codes=retry_error_codes,
).strip()
2022-04-27 11:02:45 +00:00
2023-03-23 15:33:23 +00:00
def clickhouse_execute_json(
base_args, query, timeout=60, settings=None, max_http_retries=5
):
data = clickhouse_execute_http(
base_args,
query,
timeout,
settings,
"JSONEachRow",
max_http_retries=max_http_retries,
)
if not data:
return None
rows = []
for row in data.strip().splitlines():
rows.append(json.loads(row))
return rows
class Terminated(KeyboardInterrupt):
pass
2022-04-27 11:02:45 +00:00
def signal_handler(sig, frame):
2022-04-27 11:02:45 +00:00
raise Terminated(f"Terminated with {sig} signal")
def stop_tests():
global stop_tests_triggered_lock
global stop_tests_triggered
clickhouse-test: fix shared list object (by fixing manager lifetime) Right now it is possible to get the following error: Having 20 errors! 0 tests passed. 0 tests skipped. 57.37 s elapsed (MainProcess). Won't run stateful tests because test data wasn't loaded. Traceback (most recent call last): File "/usr/lib/python3.9/multiprocessing/managers.py", line 802, in _callmethod conn = self._tls.connection AttributeError: 'ForkAwareLocal' object has no attribute 'connection' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/src/ch/clickhouse/.cmake/../tests/clickhouse-test", line 1462, in <module> main(args) File "/src/ch/clickhouse/.cmake/../tests/clickhouse-test", line 1261, in main if len(restarted_tests) > 0: File "<string>", line 2, in __len__ File "/usr/lib/python3.9/multiprocessing/managers.py", line 806, in _callmethod self._connect() File "/usr/lib/python3.9/multiprocessing/managers.py", line 793, in _connect conn = self._Client(self._token.address, authkey=self._authkey) File "/usr/lib/python3.9/multiprocessing/connection.py", line 507, in Client c = SocketClient(address) File "/usr/lib/python3.9/multiprocessing/connection.py", line 635, in SocketClient s.connect(address) ConnectionRefusedError: [Errno 111] Connection refused The reason behind this is that manager's thread got terminated: ipdb> p restarted_tests._manager._process <ForkProcess name='SyncManager-1' pid=25125 parent=24939 stopped exitcode=-SIGTERM> Refs: #29259 (cc: @vdimir) Follow-up for: #29197 (cc: @tavplubix)
2021-09-27 18:10:59 +00:00
global restarted_tests
with stop_tests_triggered_lock:
print("Stopping tests")
if not stop_tests_triggered.is_set():
stop_tests_triggered.set()
clickhouse-test: fix shared list object (by fixing manager lifetime) Right now it is possible to get the following error: Having 20 errors! 0 tests passed. 0 tests skipped. 57.37 s elapsed (MainProcess). Won't run stateful tests because test data wasn't loaded. Traceback (most recent call last): File "/usr/lib/python3.9/multiprocessing/managers.py", line 802, in _callmethod conn = self._tls.connection AttributeError: 'ForkAwareLocal' object has no attribute 'connection' During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/src/ch/clickhouse/.cmake/../tests/clickhouse-test", line 1462, in <module> main(args) File "/src/ch/clickhouse/.cmake/../tests/clickhouse-test", line 1261, in main if len(restarted_tests) > 0: File "<string>", line 2, in __len__ File "/usr/lib/python3.9/multiprocessing/managers.py", line 806, in _callmethod self._connect() File "/usr/lib/python3.9/multiprocessing/managers.py", line 793, in _connect conn = self._Client(self._token.address, authkey=self._authkey) File "/usr/lib/python3.9/multiprocessing/connection.py", line 507, in Client c = SocketClient(address) File "/usr/lib/python3.9/multiprocessing/connection.py", line 635, in SocketClient s.connect(address) ConnectionRefusedError: [Errno 111] Connection refused The reason behind this is that manager's thread got terminated: ipdb> p restarted_tests._manager._process <ForkProcess name='SyncManager-1' pid=25125 parent=24939 stopped exitcode=-SIGTERM> Refs: #29259 (cc: @vdimir) Follow-up for: #29197 (cc: @tavplubix)
2021-09-27 18:10:59 +00:00
# materialize multiprocessing.Manager().list() object before
# sending SIGTERM since this object is a proxy, that requires
# communicating with manager thread, but after SIGTERM will be
# send, this thread will die, and you will get
# ConnectionRefusedError error for any access to "restarted_tests"
# variable.
restarted_tests = [*restarted_tests]
# send signal to all processes in group to avoid hung check triggering
# (to avoid terminating clickhouse-test itself, the signal should be ignored)
signal.signal(signal.SIGTERM, signal.SIG_IGN)
os.killpg(os.getpgid(os.getpid()), signal.SIGTERM)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
2021-02-15 10:26:34 +00:00
def get_db_engine(args, database_name):
if args.replicated_database:
return f" ON CLUSTER test_cluster_database_replicated \
ENGINE=Replicated('/test/clickhouse/db/{database_name}', \
'{{shard}}', '{{replica}}')"
2020-09-21 10:24:10 +00:00
if args.db_engine:
return " ENGINE=" + args.db_engine
2022-04-27 11:02:45 +00:00
return "" # Will use default engine
2022-08-12 09:28:16 +00:00
def get_create_database_settings(args, testcase_args):
create_database_settings = dict()
if testcase_args:
create_database_settings["log_comment"] = testcase_args.testcase_basename
if args.db_engine == "Ordinary":
create_database_settings["allow_deprecated_database_ordinary"] = 1
return create_database_settings
def get_zookeeper_session_uptime(args):
try:
if args.replicated_database:
2022-04-27 11:02:45 +00:00
return int(
clickhouse_execute(
args,
"""
SELECT min(materialize(zookeeperSessionUptime()))
FROM clusterAllReplicas('test_cluster_database_replicated', system.one)
2022-04-27 11:02:45 +00:00
""",
)
)
else:
2022-04-27 11:02:45 +00:00
return int(clickhouse_execute(args, "SELECT zookeeperSessionUptime()"))
2022-04-27 11:17:54 +00:00
except Exception:
return None
def need_retry(args, stdout, stderr, total_time):
if args.check_zookeeper_session:
# Sometimes we may get unexpected exception like "Replica is readonly" or "Shutdown is called for table"
# instead of "Session expired" or "Connection loss"
# Retry if session was expired during test execution.
# If ZooKeeper is configured, then it's more reliable than checking stderr,
# but the following condition is always true if ZooKeeper is not configured.
session_uptime = get_zookeeper_session_uptime(args)
if session_uptime is not None and session_uptime < math.ceil(total_time):
return True
2022-04-27 11:02:45 +00:00
return any(msg in stdout for msg in MESSAGES_TO_RETRY) or any(
msg in stderr for msg in MESSAGES_TO_RETRY
)
2019-03-13 16:47:02 +00:00
def get_processlist_with_stacktraces(args):
2023-01-13 13:29:08 +00:00
try:
if args.replicated_database:
return clickhouse_execute(
2023-01-13 13:29:08 +00:00
args,
"""
SELECT materialize(hostName() || '::' || tcpPort()::String) as host_port, *
-- NOTE: view() here to do JOIN on shards, instead of initiator
FROM clusterAllReplicas('test_cluster_database_replicated', view(
SELECT
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace
)) AS stacktraces,
p.*
FROM system.processes p
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
))
ORDER BY elapsed DESC FORMAT Vertical
2023-01-13 13:29:08 +00:00
""",
settings={
"allow_introspection_functions": 1,
},
2023-01-13 13:29:08 +00:00
)
else:
return clickhouse_execute(
args,
"""
SELECT
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
s.trace), '\n') AS stacktrace
)) AS stacktraces,
p.*
FROM system.processes p
JOIN system.stack_trace s USING (query_id)
WHERE query NOT LIKE '%system.processes%'
GROUP BY p.*
ORDER BY elapsed DESC FORMAT Vertical
""",
settings={
"allow_introspection_functions": 1,
},
)
2023-01-13 13:29:08 +00:00
except Exception as e:
return "Failed to get processlist: " + str(e)
2022-04-27 11:02:45 +00:00
2022-03-14 20:43:34 +00:00
def get_transactions_list(args):
try:
if args.replicated_database:
2022-04-27 11:02:45 +00:00
return clickhouse_execute_json(
args,
2022-04-28 11:26:49 +00:00
"SELECT materialize((hostName(), tcpPort())) as host, * FROM "
"clusterAllReplicas('test_cluster_database_replicated', system.transactions)",
2022-04-27 11:02:45 +00:00
)
2022-03-14 20:43:34 +00:00
else:
2022-04-27 11:02:45 +00:00
return clickhouse_execute_json(args, "select * from system.transactions")
2022-03-14 20:43:34 +00:00
except Exception as e:
return f"Cannot get list of transactions: {e}"
2022-04-27 11:02:45 +00:00
2020-03-23 18:17:07 +00:00
# collect server stacktraces using gdb
def get_stacktraces_from_gdb(server_pid):
try:
cmd = f"gdb -batch -ex 'thread apply all backtrace' -p {server_pid}"
2022-04-27 11:02:45 +00:00
return subprocess.check_output(cmd, shell=True).decode("utf-8")
except Exception as e:
print(f"Error occurred while receiving stack traces from gdb: {e}")
2021-02-19 14:38:20 +00:00
return None
2020-03-23 18:17:07 +00:00
# collect server stacktraces from system.stack_trace table
# it does not work in Sandbox
def get_stacktraces_from_clickhouse(args):
2022-04-27 11:02:45 +00:00
settings_str = " ".join(
[
get_additional_client_options(args),
"--allow_introspection_functions=1",
"--skip_unavailable_shards=1",
]
)
2022-04-27 11:17:54 +00:00
replicated_msg = (
f"{args.client} {settings_str} --query "
'"SELECT materialize((hostName(), tcpPort())) as host, thread_id, '
"arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), "
"arrayMap(x -> addressToLine(x), trace), "
"arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace "
"FROM clusterAllReplicas('test_cluster_database_replicated', 'system.stack_trace') "
'ORDER BY host, thread_id FORMAT Vertical"'
2022-04-27 11:02:45 +00:00
)
2022-04-27 11:17:54 +00:00
msg = (
f"{args.client} {settings_str} --query "
"\"SELECT arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), "
"arrayMap(x -> addressToLine(x), trace), "
"arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace "
'FROM system.stack_trace FORMAT Vertical"'
2022-04-27 11:02:45 +00:00
)
try:
return subprocess.check_output(
replicated_msg if args.replicated_database else msg,
2022-04-27 11:02:45 +00:00
shell=True,
stderr=subprocess.STDOUT,
).decode("utf-8")
except Exception as e:
print(f"Error occurred while receiving stack traces from client: {e}")
2021-02-19 14:38:20 +00:00
return None
def print_stacktraces() -> None:
server_pid = get_server_pid()
bt = None
if server_pid and not args.replicated_database:
print("")
2022-04-27 11:02:45 +00:00
print(
f"Located ClickHouse server process {server_pid} listening at TCP port {args.tcp_port}"
)
print("Collecting stacktraces from all running threads with gdb:")
bt = get_stacktraces_from_gdb(server_pid)
if len(bt) < 1000:
print("Got suspiciously small stacktraces: ", bt)
bt = None
if bt is None:
print("\nCollecting stacktraces from system.stacktraces table:")
bt = get_stacktraces_from_clickhouse(args)
if bt is not None:
print(bt)
return
2022-04-27 11:02:45 +00:00
print(
colored(
2022-04-27 11:17:54 +00:00
f"\nUnable to locate ClickHouse server process listening at TCP port "
f"{args.tcp_port}. It must have crashed or exited prematurely!",
2022-04-27 11:02:45 +00:00
args,
"red",
attrs=["bold"],
)
)
def get_server_pid():
2021-02-19 14:38:20 +00:00
# lsof does not work in stress tests for some reason
2022-04-27 11:17:54 +00:00
cmd_lsof = f"lsof -i tcp:{args.tcp_port} -s tcp:LISTEN -Fp | sed 's/^p//p;d'"
2021-02-19 14:38:20 +00:00
cmd_pidof = "pidof -s clickhouse-server"
2021-02-19 14:38:20 +00:00
commands = [cmd_lsof, cmd_pidof]
2021-02-19 09:57:09 +00:00
output = None
2021-02-19 14:38:20 +00:00
for cmd in commands:
try:
2022-04-27 11:02:45 +00:00
output = subprocess.check_output(
cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True
)
2021-02-19 14:38:20 +00:00
if output:
return int(output)
except Exception as e:
print(f"Cannot get server pid with {cmd}, got {output}: {e}")
return None # most likely server is dead
def colored(text, args, color=None, on_color=None, attrs=None):
if termcolor and (sys.stdout.isatty() or args.force_color):
return termcolor.colored(text, color, on_color, attrs)
else:
return text
class TestStatus(enum.Enum):
FAIL = "FAIL"
UNKNOWN = "UNKNOWN"
OK = "OK"
SKIPPED = "SKIPPED"
class FailureReason(enum.Enum):
# FAIL reasons
TIMEOUT = "Timeout!"
SERVER_DIED = "server died"
EXIT_CODE = "return code: "
STDERR = "having stderror: "
EXCEPTION = "having exception in stdout: "
RESULT_DIFF = "result differs with reference: "
TOO_LONG = "Test runs too long (> 60s). Make it faster."
INTERNAL_QUERY_FAIL = "Internal query (CREATE/DROP DATABASE) failed:"
# SKIPPED reasons
DISABLED = "disabled"
SKIP = "skip"
NO_JINJA = "no jinja"
NO_ZOOKEEPER = "no zookeeper"
NO_SHARD = "no shard"
FAST_ONLY = "running fast tests only"
NO_LONG = "not running long tests"
REPLICATED_DB = "replicated-database"
S3_STORAGE = "s3-storage"
BUILD = "not running for current build"
NO_UPGRADE_CHECK = "not running for upgrade check"
2023-02-03 13:34:18 +00:00
NO_PARALLEL_REPLICAS = "smth in not supported with parallel replicas"
# UNKNOWN reasons
NO_REFERENCE = "no reference file"
INTERNAL_ERROR = "Test internal error: "
def threshold_generator(always_on_prob, always_off_prob, min_val, max_val):
def gen():
tmp = random.random()
if tmp <= always_on_prob:
return min_val
if tmp <= always_on_prob + always_off_prob:
return max_val
if isinstance(min_val, int) and isinstance(max_val, int):
return random.randint(min_val, max_val)
else:
return random.uniform(min_val, max_val)
return gen
class SettingsRandomizer:
settings = {
2022-04-27 11:02:45 +00:00
"max_insert_threads": lambda: 0
if random.random() < 0.5
else random.randint(1, 16),
"group_by_two_level_threshold": threshold_generator(0.2, 0.2, 1, 1000000),
"group_by_two_level_threshold_bytes": threshold_generator(
0.2, 0.2, 1, 50000000
),
"distributed_aggregation_memory_efficient": lambda: random.randint(0, 1),
2022-02-18 12:22:06 +00:00
"fsync_metadata": lambda: random.randint(0, 1),
"output_format_parallel_formatting": lambda: random.randint(0, 1),
"input_format_parallel_parsing": lambda: random.randint(0, 1),
2022-04-27 11:02:45 +00:00
"min_chunk_bytes_for_parallel_parsing": lambda: max(
1024, int(random.gauss(10 * 1024 * 1024, 5 * 1000 * 1000))
),
2022-03-28 13:33:01 +00:00
"max_read_buffer_size": lambda: random.randint(500000, 1048576),
"prefer_localhost_replica": lambda: random.randint(0, 1),
"max_block_size": lambda: random.randint(8000, 100000),
"max_threads": lambda: random.randint(1, 64),
2022-05-10 23:30:59 +00:00
"optimize_or_like_chain": lambda: random.randint(0, 1),
"optimize_read_in_order": lambda: random.randint(0, 1),
"read_in_order_two_level_merge_threshold": lambda: random.randint(0, 100),
"optimize_aggregation_in_order": lambda: random.randint(0, 1),
"aggregation_in_order_max_block_bytes": lambda: random.randint(0, 50000000),
"min_compress_block_size": lambda: random.randint(1, 1048576 * 3),
"max_compress_block_size": lambda: random.randint(1, 1048576 * 3),
2022-07-08 19:27:16 +00:00
"use_uncompressed_cache": lambda: random.randint(0, 1),
"min_bytes_to_use_direct_io": threshold_generator(
0.2, 0.5, 1, 10 * 1024 * 1024 * 1024
),
"min_bytes_to_use_mmap_io": threshold_generator(
0.2, 0.5, 1, 10 * 1024 * 1024 * 1024
),
2022-07-08 19:27:16 +00:00
"local_filesystem_read_method": lambda: random.choice(
2022-06-27 10:40:52 +00:00
["read", "pread", "mmap", "pread_threadpool", "io_uring"]
2022-07-08 19:27:16 +00:00
),
"remote_filesystem_read_method": lambda: random.choice(["read", "threadpool"]),
"local_filesystem_read_prefetch": lambda: random.randint(0, 1),
"remote_filesystem_read_prefetch": lambda: random.randint(0, 1),
"compile_expressions": lambda: random.randint(0, 1),
"compile_aggregate_expressions": lambda: random.randint(0, 1),
"compile_sort_description": lambda: random.randint(0, 1),
2022-07-13 12:50:12 +00:00
"merge_tree_coarse_index_granularity": lambda: random.randint(2, 32),
"optimize_distinct_in_order": lambda: random.randint(0, 1),
"optimize_sorting_by_input_stream_properties": lambda: random.randint(0, 1),
"http_response_buffer_size": lambda: random.randint(0, 10 * 1048576),
"http_wait_end_of_query": lambda: random.random() > 0.5,
"enable_memory_bound_merging_of_aggregation_results": lambda: random.randint(
0, 1
),
}
@staticmethod
def get_random_settings():
random_settings = []
for setting, generator in SettingsRandomizer.settings.items():
2022-04-28 11:26:49 +00:00
random_settings.append(f"{setting}={generator()}")
return random_settings
2022-07-07 22:16:01 +00:00
class MergeTreeSettingsRandomizer:
settings = {
"ratio_of_defaults_for_sparse_serialization": threshold_generator(
0.3, 0.5, 0.0, 1.0
),
"prefer_fetch_merged_part_size_threshold": threshold_generator(
0.2, 0.5, 1, 10 * 1024 * 1024 * 1024
),
"vertical_merge_algorithm_min_rows_to_activate": threshold_generator(
2022-12-16 17:50:01 +00:00
0.4, 0.4, 1, 1000000
),
"vertical_merge_algorithm_min_columns_to_activate": threshold_generator(
2022-12-16 17:50:01 +00:00
0.4, 0.4, 1, 100
),
2023-03-31 12:49:53 +00:00
"allow_vertical_merges_from_compact_to_wide_parts": lambda: random.randint(
0, 1
),
"min_merge_bytes_to_use_direct_io": threshold_generator(
0.25, 0.25, 1, 10 * 1024 * 1024 * 1024
),
"index_granularity_bytes": lambda: random.randint(1024, 30 * 1024 * 1024),
"merge_max_block_size": lambda: random.randint(1, 8192 * 3),
"index_granularity": lambda: random.randint(1, 65536),
"min_bytes_for_wide_part": threshold_generator(0.3, 0.3, 0, 1024 * 1024 * 1024),
"compress_marks": lambda: random.randint(0, 1),
"compress_primary_key": lambda: random.randint(0, 1),
"marks_compress_block_size": lambda: random.randint(8000, 100000),
"primary_key_compress_block_size": lambda: random.randint(8000, 100000),
}
2022-07-07 22:16:01 +00:00
@staticmethod
def get_random_settings(args):
random_settings = []
for setting, generator in MergeTreeSettingsRandomizer.settings.items():
if setting not in args.changed_merge_tree_settings:
random_settings.append(f"{setting}={generator()}")
return random_settings
class TestResult:
2022-04-27 11:02:45 +00:00
def __init__(
self,
case_name: str,
status: TestStatus,
reason: Optional[FailureReason],
total_time: float,
description: str,
):
self.case_name: str = case_name
self.status: TestStatus = status
self.reason: Optional[FailureReason] = reason
self.total_time: float = total_time
self.description: str = description
self.need_retry: bool = False
def check_if_need_retry(self, args, stdout, stderr, runs_count):
2022-04-28 11:26:49 +00:00
if (
self.status != TestStatus.FAIL
or not need_retry(args, stdout, stderr, self.total_time)
or MAX_RETRIES < runs_count
):
return
self.need_retry = True
class TestCase:
@staticmethod
def get_description_from_exception_info(exc_info):
exc_type, exc_value, tb = exc_info
exc_name = exc_type.__name__
traceback_str = "\n".join(traceback.format_tb(tb, 10))
description = f"\n{exc_name}\n{exc_value}\n{traceback_str}"
return description
@staticmethod
def get_reference_file(suite_dir, name):
"""
Returns reference file name for specified test
"""
name = removesuffix(name, ".gen")
2022-04-27 11:02:45 +00:00
for ext in [".reference", ".gen.reference"]:
reference_file = os.path.join(suite_dir, name) + ext
if os.path.isfile(reference_file):
return reference_file
return None
@staticmethod
def configure_testcase_args(args, case_file, suite_tmp_dir):
testcase_args = copy.deepcopy(args)
testcase_args.testcase_start_time = datetime.now()
testcase_basename = os.path.basename(case_file)
2022-04-27 11:02:45 +00:00
testcase_args.testcase_client = (
f"{testcase_args.client} --log_comment '{testcase_basename}'"
)
testcase_args.testcase_basename = testcase_basename
2021-08-06 14:38:28 +00:00
if testcase_args.database:
database = testcase_args.database
os.environ.setdefault("CLICKHOUSE_DATABASE", database)
os.environ.setdefault("CLICKHOUSE_TMP", suite_tmp_dir)
testcase_args.test_tmp_dir = suite_tmp_dir
else:
2022-04-28 11:26:49 +00:00
# If --database is not specified, we will create temporary database with
# unique name and we will recreate and drop it for each test
2022-08-31 18:53:57 +00:00
def random_str(length=8):
alphabet = string.ascii_lowercase + string.digits
# NOTE: it is important not to use default random generator, since it shares state.
2022-04-27 11:02:45 +00:00
return "".join(
random.SystemRandom().choice(alphabet) for _ in range(length)
)
2021-08-06 14:38:28 +00:00
2022-04-28 11:26:49 +00:00
database = f"test_{random_str()}"
2021-08-06 14:38:28 +00:00
2022-04-27 11:02:45 +00:00
clickhouse_execute(
args,
2023-03-23 15:33:23 +00:00
"CREATE DATABASE IF NOT EXISTS "
+ database
+ get_db_engine(testcase_args, database),
2022-08-12 09:28:16 +00:00
settings=get_create_database_settings(args, testcase_args),
2022-04-27 11:02:45 +00:00
)
2021-08-06 14:38:28 +00:00
os.environ["CLICKHOUSE_DATABASE"] = database
# Set temporary directory to match the randomly generated database,
# because .sh tests also use it for temporary files and we want to avoid
# collisions.
testcase_args.test_tmp_dir = os.path.join(suite_tmp_dir, database)
os.mkdir(testcase_args.test_tmp_dir)
os.environ["CLICKHOUSE_TMP"] = testcase_args.test_tmp_dir
testcase_args.testcase_database = database
# Printed only in case of failures
#
# NOTE: here we use "CLICKHOUSE_TMP" instead of "file_suffix",
# so it is installed in configure_testcase_args() unlike other files
# (stdout_file, stderr_file) in TestCase::__init__().
# Since using CLICKHOUSE_TMP is easier to use in expect.
testcase_args.debug_log_file = (
os.path.join(testcase_args.test_tmp_dir, testcase_basename) + ".debuglog"
)
return testcase_args
@staticmethod
2022-07-14 11:42:12 +00:00
def cli_format_settings(settings_list) -> str:
2022-07-07 22:16:01 +00:00
return " ".join([f"--{setting}" for setting in settings_list])
2022-04-28 11:26:49 +00:00
def has_show_create_table_in_test(self):
2022-10-28 23:26:06 +00:00
return not subprocess.call(["grep", "-iq", "show create", self.case_file])
2022-02-11 14:15:56 +00:00
def add_random_settings(self, client_options):
new_options = ""
if self.randomize_settings:
if len(self.base_url_params) == 0:
os.environ["CLICKHOUSE_URL_PARAMS"] = "&".join(self.random_settings)
else:
os.environ["CLICKHOUSE_URL_PARAMS"] = (
self.base_url_params + "&" + "&".join(self.random_settings)
)
new_options += f" {self.cli_format_settings(self.random_settings)}"
if self.randomize_merge_tree_settings:
2022-07-07 22:16:01 +00:00
new_options += f" --allow_merge_tree_settings {self.cli_format_settings(self.merge_tree_random_settings)}"
if new_options != "":
new_options += " --allow_repeated_settings"
os.environ["CLICKHOUSE_CLIENT_OPT"] = (
self.base_client_options + new_options + " "
2022-04-27 11:02:45 +00:00
)
return client_options + new_options
def remove_random_settings_from_env(self):
2022-04-27 11:02:45 +00:00
os.environ["CLICKHOUSE_URL_PARAMS"] = self.base_url_params
os.environ["CLICKHOUSE_CLIENT_OPT"] = self.base_client_options
def add_info_about_settings(self, description):
if self.randomize_settings:
description += f"\nSettings used in the test: {self.cli_format_settings(self.random_settings)}"
if self.randomize_merge_tree_settings:
description += f"\n\nMergeTree settings used in test: {self.cli_format_settings(self.merge_tree_random_settings)}"
2022-02-11 14:15:56 +00:00
2022-07-07 22:16:01 +00:00
return description + "\n"
def __init__(self, suite, case: str, args, is_concurrent: bool):
2022-04-27 11:02:45 +00:00
self.case: str = case # case file name
self.tags: Set[str] = suite.all_tags[case] if case in suite.all_tags else set()
2022-03-10 12:22:24 +00:00
for tag in os.getenv("GLOBAL_TAGS", "").split(","):
self.tags.add(tag.strip())
self.case_file: str = os.path.join(suite.suite_path, case)
(self.name, self.ext) = os.path.splitext(case)
2022-04-28 11:26:49 +00:00
file_suffix = f".{os.getpid()}" if is_concurrent and args.test_runs > 1 else ""
self.reference_file = self.get_reference_file(suite.suite_path, self.name)
2022-04-27 11:02:45 +00:00
self.stdout_file = (
os.path.join(suite.suite_tmp_path, self.name) + file_suffix + ".stdout"
)
self.stderr_file = (
os.path.join(suite.suite_tmp_path, self.name) + file_suffix + ".stderr"
)
self.testcase_args = None
self.runs_count = 0
has_no_random_settings_tag = self.tags and "no-random-settings" in self.tags
self.randomize_settings = not (
args.no_random_settings or has_no_random_settings_tag
2022-07-07 22:16:01 +00:00
)
2022-10-28 23:26:06 +00:00
has_no_random_merge_tree_settings_tag = (
self.tags and "no-random-merge-tree-settings" in self.tags
)
# If test contains SHOW CREATE TABLE do not
# randomize merge tree settings, because
2022-10-28 23:26:06 +00:00
# they will be added to table definition and test will fail
self.randomize_merge_tree_settings = not (
args.no_random_merge_tree_settings
or has_no_random_settings_tag
2022-10-28 23:26:06 +00:00
or has_no_random_merge_tree_settings_tag
or self.has_show_create_table_in_test()
)
if self.randomize_settings:
self.random_settings = SettingsRandomizer.get_random_settings()
if self.randomize_merge_tree_settings:
self.merge_tree_random_settings = (
MergeTreeSettingsRandomizer.get_random_settings(args)
)
2022-04-27 11:02:45 +00:00
self.base_url_params = (
os.environ["CLICKHOUSE_URL_PARAMS"]
if "CLICKHOUSE_URL_PARAMS" in os.environ
else ""
)
2022-07-07 22:16:01 +00:00
2022-04-27 11:02:45 +00:00
self.base_client_options = (
os.environ["CLICKHOUSE_CLIENT_OPT"]
if "CLICKHOUSE_CLIENT_OPT" in os.environ
else ""
)
# should skip test, should increment skipped_total, skip reason
def should_skip_test(self, suite) -> Optional[FailureReason]:
tags = self.tags
2022-04-27 11:02:45 +00:00
if tags and ("disabled" in tags) and not args.disabled:
return FailureReason.DISABLED
2022-04-27 11:02:45 +00:00
elif (
os.path.exists(os.path.join(suite.suite_path, self.name) + ".disabled")
and not args.disabled
):
return FailureReason.DISABLED
2023-02-03 13:34:18 +00:00
elif "no-parallel-replicas" in tags and args.no_parallel_replicas:
return FailureReason.NO_PARALLEL_REPLICAS
elif args.skip and any(s in self.name for s in args.skip):
return FailureReason.SKIP
elif not USE_JINJA and self.ext.endswith("j2"):
return FailureReason.NO_JINJA
2022-04-27 11:02:45 +00:00
elif (
tags
and (("zookeeper" in tags) or ("replica" in tags))
and not args.zookeeper
):
return FailureReason.NO_ZOOKEEPER
2022-04-27 11:02:45 +00:00
elif (
tags
and (("shard" in tags) or ("distributed" in tags) or ("global" in tags))
and not args.shard
):
return FailureReason.NO_SHARD
2022-04-27 11:02:45 +00:00
elif tags and ("no-fasttest" in tags) and args.fast_tests_only:
return FailureReason.FAST_ONLY
2022-04-27 11:02:45 +00:00
elif (
tags
and (("long" in tags) or ("deadlock" in tags) or ("race" in tags))
and args.no_long
):
# Tests for races and deadlocks usually are run in a loop for a significant amount of time
return FailureReason.NO_LONG
2022-04-27 11:02:45 +00:00
elif tags and ("no-replicated-database" in tags) and args.replicated_database:
return FailureReason.REPLICATED_DB
# TODO: remove checking "no-upgrade-check" after 23.1
elif args.upgrade_check and (
2023-03-23 15:33:23 +00:00
"no-upgrade-check" in tags or "no-upgrade-check" in tags
):
return FailureReason.NO_UPGRADE_CHECK
2022-04-27 11:02:45 +00:00
elif tags and ("no-s3-storage" in tags) and args.s3_storage:
return FailureReason.S3_STORAGE
elif tags:
for build_flag in args.build_flags:
2022-04-27 11:02:45 +00:00
if "no-" + build_flag in tags:
return FailureReason.BUILD
for tag in tags:
2022-04-27 11:02:45 +00:00
tag = tag.replace("-", "_")
if tag.startswith("use_") and tag not in args.build_flags:
return FailureReason.BUILD
return None
2022-07-08 19:27:16 +00:00
def process_result_impl(
self, proc, stdout: str, stderr: str, debug_log: str, total_time: float
):
description = ""
debug_log = trim_for_log(debug_log)
if proc:
if proc.returncode is None:
try:
proc.kill()
except OSError as e:
if e.errno != ESRCH:
raise
if stderr:
description += stderr
if debug_log:
description += "\n"
description += debug_log
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.TIMEOUT,
total_time,
description,
)
if proc.returncode != 0:
reason = FailureReason.EXIT_CODE
description += str(proc.returncode)
if stderr:
description += "\n"
description += stderr
if debug_log:
description += "\n"
description += debug_log
# Stop on fatal errors like segmentation fault. They are sent to client via logs.
2022-04-27 11:02:45 +00:00
if " <Fatal> " in stderr:
reason = FailureReason.SERVER_DIED
2022-04-27 11:02:45 +00:00
if (
self.testcase_args.stop
and (
"Connection refused" in stderr
or "Attempt to read after eof" in stderr
)
and "Received exception from server" not in stderr
):
reason = FailureReason.SERVER_DIED
if os.path.isfile(self.stdout_file):
description += ", result:\n\n"
description += trim_for_log(open(self.stdout_file).read())
2022-04-27 11:02:45 +00:00
description += "\n"
2022-04-28 11:26:49 +00:00
description += f"\nstdout:\n{stdout}\n"
2022-04-27 11:02:45 +00:00
return TestResult(
self.name, TestStatus.FAIL, reason, total_time, description
)
if stderr:
description += "\n"
description += trim_for_log(stderr)
description += "\n"
description += "\nstdout:\n"
description += trim_for_log(stdout)
description += "\n"
if debug_log:
description += "\n"
description += debug_log
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.STDERR,
total_time,
description,
)
if "Exception" in stdout:
description += "\n"
description += trim_for_log(stdout)
description += "\n"
if debug_log:
description += "\n"
description += debug_log
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.EXCEPTION,
total_time,
description,
)
if "@@SKIP@@" in stdout:
skip_reason = stdout.replace("@@SKIP@@", "").rstrip("\n")
description += " - "
description += skip_reason
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.SKIPPED,
FailureReason.SKIP,
total_time,
description,
)
if self.reference_file is None:
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.UNKNOWN,
FailureReason.NO_REFERENCE,
total_time,
description,
)
result_is_different = subprocess.call(
["diff", "-q", self.reference_file, self.stdout_file], stdout=PIPE
)
if result_is_different:
2022-04-27 11:02:45 +00:00
diff = Popen(
[
"diff",
"-U",
str(self.testcase_args.unified),
self.reference_file,
self.stdout_file,
],
stdout=PIPE,
universal_newlines=True,
).communicate()[0]
if diff.startswith("Binary files "):
diff += "Content of stdout:\n===================\n"
file = open(self.stdout_file, "r")
diff += str(file.read())
file.close()
diff += "==================="
2022-04-28 11:26:49 +00:00
description += f"\n{diff}\n"
if debug_log:
description += "\n"
description += debug_log
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.RESULT_DIFF,
total_time,
description,
)
if (
self.testcase_args.test_runs > 1
2023-01-30 01:51:21 +00:00
and total_time > 120
2022-04-27 11:02:45 +00:00
and "long" not in self.tags
):
if debug_log:
description += "\n"
description += debug_log
# We're in Flaky Check mode, check the run time as well while we're at it.
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.TOO_LONG,
total_time,
description,
)
if os.path.exists(self.stdout_file):
os.remove(self.stdout_file)
if os.path.exists(self.stderr_file):
os.remove(self.stderr_file)
if os.path.exists(self.testcase_args.debug_log_file):
os.remove(self.testcase_args.debug_log_file)
return TestResult(self.name, TestStatus.OK, None, total_time, description)
@staticmethod
def print_test_time(test_time) -> str:
if args.print_time:
2022-04-28 11:26:49 +00:00
return f" {test_time:.2f} sec."
else:
2022-04-27 11:02:45 +00:00
return ""
def process_result(self, result: TestResult, messages):
description_full = messages[result.status]
description_full += self.print_test_time(result.total_time)
if result.reason is not None:
description_full += " - "
description_full += result.reason.value
description_full += result.description
description_full += "\n"
if result.status == TestStatus.FAIL and self.testcase_args:
2022-04-27 11:02:45 +00:00
description_full += "Database: " + self.testcase_args.testcase_database
result.description = description_full
return result
@staticmethod
2022-04-28 11:26:49 +00:00
def send_test_name_failed(suite: str, case: str):
pid = os.getpid()
2023-03-23 15:33:23 +00:00
clickhouse_execute(
args,
f"SELECT 'Running test {suite}/{case} from pid={pid}'",
retry_error_codes=True,
)
2022-04-28 11:26:49 +00:00
def run_single_test(
self, server_logs_level, client_options
) -> Tuple[Optional[Popen], str, str, str, float]:
args = self.testcase_args
client = args.testcase_client
start_time = args.testcase_start_time
database = args.testcase_database
# This is for .sh tests
os.environ["CLICKHOUSE_LOG_COMMENT"] = args.testcase_basename
params = {
2022-04-27 11:02:45 +00:00
"client": client + " --database=" + database,
"logs_level": server_logs_level,
"options": client_options,
"test": self.case_file,
"stdout": self.stdout_file,
"stderr": self.stderr_file,
"secure": "--secure" if args.secure else "",
}
# >> append to stderr (but not stdout since it is not used there),
# because there are also output of per test database creation
2023-03-21 09:40:10 +00:00
pattern = "{test} > {stdout} 2> {stderr}"
2022-04-27 11:02:45 +00:00
if self.ext == ".sql":
pattern = (
"{client} --send_logs_level={logs_level} {secure} --multiquery {options} < "
2022-04-27 11:02:45 +00:00
+ pattern
)
command = pattern.format(**params)
proc = Popen(command, shell=True, env=os.environ)
2022-04-27 11:02:45 +00:00
while (
datetime.now() - start_time
).total_seconds() < args.timeout and proc.poll() is None:
sleep(0.01)
need_drop_database = not args.database
if need_drop_database and args.no_drop_if_fail:
2022-04-27 11:02:45 +00:00
maybe_passed = (
(proc.returncode == 0)
and (proc.stderr is None)
and (proc.stdout is None or "Exception" not in proc.stdout)
)
2022-10-19 15:24:50 +00:00
need_drop_database = maybe_passed
debug_log = ""
if os.path.exists(self.testcase_args.debug_log_file):
with open(self.testcase_args.debug_log_file, "rb") as stream:
debug_log += self.testcase_args.debug_log_file + ":\n"
debug_log += str(stream.read(), errors="replace", encoding="utf-8")
debug_log += "\n"
if need_drop_database:
2022-04-27 11:02:45 +00:00
seconds_left = max(
args.timeout - (datetime.now() - start_time).total_seconds(), 20
)
drop_database_query = "DROP DATABASE IF EXISTS " + database
if args.replicated_database:
drop_database_query += " ON CLUSTER test_cluster_database_replicated"
try:
# It's possible to get an error "New table appeared in database being dropped or detached. Try again."
for _ in range(1, 60):
try:
clickhouse_execute(
args,
drop_database_query,
timeout=seconds_left,
settings={
"log_comment": args.testcase_basename,
},
)
except HTTPError as e:
if need_retry(args, e.message, e.message, 0):
continue
raise
break
except socket.timeout:
total_time = (datetime.now() - start_time).total_seconds()
2022-04-27 11:02:45 +00:00
return (
None,
"",
f"Timeout dropping database {database} after test",
debug_log,
2022-04-27 11:02:45 +00:00
total_time,
)
shutil.rmtree(args.test_tmp_dir)
total_time = (datetime.now() - start_time).total_seconds()
# Normalize randomized database names in stdout, stderr files.
2022-04-28 11:26:49 +00:00
os.system(f"LC_ALL=C sed -i -e 's/{database}/default/g' {self.stdout_file}")
if args.hide_db_name:
2022-04-28 11:26:49 +00:00
os.system(f"LC_ALL=C sed -i -e 's/{database}/default/g' {self.stderr_file}")
if args.replicated_database:
2022-04-28 11:26:49 +00:00
os.system(f"LC_ALL=C sed -i -e 's|/auto_{{shard}}||g' {self.stdout_file}")
os.system(f"LC_ALL=C sed -i -e 's|auto_{{replica}}||g' {self.stdout_file}")
# Normalize hostname in stdout file.
2022-04-27 11:02:45 +00:00
os.system(
2022-04-28 11:26:49 +00:00
f"LC_ALL=C sed -i -e 's/{socket.gethostname()}/localhost/g' {self.stdout_file}"
2022-04-27 11:02:45 +00:00
)
2022-04-28 11:26:49 +00:00
stdout = ""
if os.path.exists(self.stdout_file):
with open(self.stdout_file, "rb") as stdfd:
stdout = str(stdfd.read(), errors="replace", encoding="utf-8")
2022-04-28 11:26:49 +00:00
stderr = ""
if os.path.exists(self.stderr_file):
with open(self.stderr_file, "rb") as stdfd:
stderr += str(stdfd.read(), errors="replace", encoding="utf-8")
return proc, stdout, stderr, debug_log, total_time
def run(self, args, suite, client_options, server_logs_level):
try:
skip_reason = self.should_skip_test(suite)
if skip_reason is not None:
2022-04-27 11:02:45 +00:00
return TestResult(self.name, TestStatus.SKIPPED, skip_reason, 0.0, "")
if args.testname:
try:
self.send_test_name_failed(suite.suite, self.case)
2022-04-28 11:26:49 +00:00
except Exception:
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.SERVER_DIED,
0.0,
"\nServer does not respond to health check\n",
)
self.runs_count += 1
2022-04-27 11:02:45 +00:00
self.testcase_args = self.configure_testcase_args(
args, self.case_file, suite.suite_tmp_path
)
client_options = self.add_random_settings(client_options)
proc, stdout, stderr, debug_log, total_time = self.run_single_test(
2022-04-27 11:02:45 +00:00
server_logs_level, client_options
)
2022-07-08 19:27:16 +00:00
result = self.process_result_impl(
proc, stdout, stderr, debug_log, total_time
)
result.check_if_need_retry(args, stdout, stderr, self.runs_count)
# to avoid breaking CSV parser
result.description = result.description.replace("\0", "")
if result.status == TestStatus.FAIL:
result.description = self.add_info_about_settings(result.description)
return result
except KeyboardInterrupt as e:
raise e
except HTTPError:
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.INTERNAL_QUERY_FAIL,
0.0,
self.add_info_about_settings(
self.get_description_from_exception_info(sys.exc_info())
2022-04-27 11:02:45 +00:00
),
)
2022-12-29 23:42:03 +00:00
except (ConnectionError, http.client.ImproperConnectionState):
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.FAIL,
FailureReason.SERVER_DIED,
0.0,
self.add_info_about_settings(
self.get_description_from_exception_info(sys.exc_info())
2022-04-27 11:02:45 +00:00
),
)
2022-04-28 11:26:49 +00:00
except Exception:
2022-04-27 11:02:45 +00:00
return TestResult(
self.name,
TestStatus.UNKNOWN,
FailureReason.INTERNAL_ERROR,
0.0,
self.get_description_from_exception_info(sys.exc_info()),
)
finally:
self.remove_random_settings_from_env()
class TestSuite:
@staticmethod
2022-04-28 11:26:49 +00:00
def tests_in_suite_key_func(item: str) -> float:
2022-04-27 11:02:45 +00:00
if args.order == "random":
return random.random()
2022-04-27 11:02:45 +00:00
reverse = 1 if args.order == "asc" else -1
2022-04-27 11:02:45 +00:00
if -1 == item.find("_"):
return 99998
2022-04-27 11:02:45 +00:00
prefix, _ = item.split("_", 1)
try:
return reverse * int(prefix)
except ValueError:
return 99997
@staticmethod
def render_test_template(j2env, suite_dir, test_name):
"""
Render template for test and reference file if needed
"""
if j2env is None:
return test_name
test_base_name = removesuffix(test_name, ".sql.j2", ".sql")
reference_file_name = test_base_name + ".reference.j2"
reference_file_path = os.path.join(suite_dir, reference_file_name)
if os.path.isfile(reference_file_path):
tpl = j2env.get_template(reference_file_name)
2022-04-27 11:02:45 +00:00
tpl.stream().dump(
os.path.join(suite_dir, test_base_name) + ".gen.reference"
)
if test_name.endswith(".sql.j2"):
tpl = j2env.get_template(test_name)
generated_test_name = test_base_name + ".gen.sql"
tpl.stream().dump(os.path.join(suite_dir, generated_test_name))
return generated_test_name
return test_name
@staticmethod
def read_test_tags(suite_dir: str, all_tests: List[str]) -> Dict[str, Set[str]]:
def get_comment_sign(filename):
2022-04-27 11:02:45 +00:00
if filename.endswith(".sql") or filename.endswith(".sql.j2"):
return "--"
elif (
filename.endswith(".sh")
or filename.endswith(".py")
or filename.endswith(".expect")
):
return "#"
else:
2022-04-27 11:02:45 +00:00
raise Exception(f"Unknown file_extension: {filename}")
def parse_tags_from_line(line, comment_sign):
if not line.startswith(comment_sign):
return None
2022-04-28 11:26:49 +00:00
tags_str = line[len(comment_sign) :].lstrip() # noqa: ignore E203
tags_prefix = "Tags:"
if not tags_str.startswith(tags_prefix):
return None
2022-04-28 11:26:49 +00:00
tags_str = tags_str[len(tags_prefix) :] # noqa: ignore E203
2022-04-27 11:02:45 +00:00
tags = tags_str.split(",")
tags = {tag.strip() for tag in tags}
return tags
2022-04-28 11:26:49 +00:00
def is_shebang(line: str) -> bool:
2022-04-27 11:02:45 +00:00
return line.startswith("#!")
def find_tag_line(file):
for line in file:
line = line.strip()
if line and not is_shebang(line):
return line
return ""
def load_tags_from_file(filepath):
comment_sign = get_comment_sign(filepath)
2022-04-28 11:26:49 +00:00
with open(filepath, "r", encoding="utf-8") as file:
try:
line = find_tag_line(file)
except UnicodeDecodeError:
return []
return parse_tags_from_line(line, comment_sign)
all_tags = {}
start_time = datetime.now()
for test_name in all_tests:
tags = load_tags_from_file(os.path.join(suite_dir, test_name))
if tags:
all_tags[test_name] = tags
elapsed = (datetime.now() - start_time).total_seconds()
if elapsed > 1:
print(f"Tags for suite {suite_dir} read in {elapsed:.2f} seconds")
return all_tags
def __init__(self, args, suite_path: str, suite_tmp_path: str, suite: str):
self.args = args
self.suite_path: str = suite_path
self.suite_tmp_path: str = suite_tmp_path
self.suite: str = suite
2022-04-28 11:26:49 +00:00
filter_func = lambda x: True # noqa: ignore E731
if args.run_by_hash_num is not None and args.run_by_hash_total is not None:
if args.run_by_hash_num > args.run_by_hash_total:
2022-04-27 11:02:45 +00:00
raise Exception(
f"Incorrect run by hash, value {args.run_by_hash_num} bigger than total {args.run_by_hash_total}"
)
2022-04-27 11:02:45 +00:00
filter_func = (
lambda x: stringhash(x) % args.run_by_hash_total == args.run_by_hash_num
)
2022-04-27 11:02:45 +00:00
self.all_tests: List[str] = self.get_tests_list(
self.tests_in_suite_key_func, filter_func
)
self.all_tags: Dict[str, Set[str]] = self.read_test_tags(
self.suite_path, self.all_tests
)
self.sequential_tests = []
self.parallel_tests = []
for test_name in self.all_tests:
if self.is_sequential_test(test_name):
self.sequential_tests.append(test_name)
else:
self.parallel_tests.append(test_name)
def is_sequential_test(self, test_name):
if args.sequential:
if any(s in test_name for s in args.sequential):
return True
if test_name not in self.all_tags:
return False
2022-04-27 11:02:45 +00:00
return ("no-parallel" in self.all_tags[test_name]) or (
"sequential" in self.all_tags[test_name]
)
def get_tests_list(self, sort_key, filter_func):
"""
Return list of tests file names to run
"""
all_tests = list(self.get_selected_tests(filter_func))
all_tests = all_tests * self.args.test_runs
all_tests.sort(key=sort_key)
return all_tests
def get_selected_tests(self, filter_func):
"""
Find all files with tests, filter, render templates
"""
2022-04-27 11:02:45 +00:00
j2env = (
jinja2.Environment(
loader=jinja2.FileSystemLoader(self.suite_path),
keep_trailing_newline=True,
)
if USE_JINJA
else None
)
for test_name in os.listdir(self.suite_path):
if not is_test_from_dir(self.suite_path, test_name):
continue
2022-04-27 11:02:45 +00:00
if self.args.test and not any(
re.search(pattern, test_name) for pattern in self.args.test
):
continue
if USE_JINJA and test_name.endswith(".gen.sql"):
continue
if not filter_func(test_name):
continue
test_name = self.render_test_template(j2env, self.suite_path, test_name)
yield test_name
@staticmethod
def read_test_suite(args, suite_dir_name: str):
def is_data_present():
try:
return int(clickhouse_execute(args, "EXISTS TABLE test.hits"))
except Exception as e:
print(
"Cannot check if dataset is available, assuming it's not: ", str(e)
)
return False
base_dir = os.path.abspath(args.queries)
tmp_dir = os.path.abspath(args.tmp)
suite_path = os.path.join(base_dir, suite_dir_name)
2022-04-27 11:02:45 +00:00
suite_re_obj = re.search("^[0-9]+_(.*)$", suite_dir_name)
if not suite_re_obj: # skip .gitignore and so on
return None
suite_tmp_path = os.path.join(tmp_dir, suite_dir_name)
if not os.path.exists(suite_tmp_path):
os.makedirs(suite_tmp_path)
suite = suite_re_obj.group(1)
if not os.path.isdir(suite_path):
return None
2022-04-27 11:02:45 +00:00
if "stateful" in suite and not args.no_stateful and not is_data_present():
print("Won't run stateful tests because test data wasn't loaded.")
return None
2022-04-27 11:02:45 +00:00
if "stateless" in suite and args.no_stateless:
print("Won't run stateless tests because they were manually disabled.")
return None
2022-04-27 11:02:45 +00:00
if "stateful" in suite and args.no_stateful:
print("Won't run stateful tests because they were manually disabled.")
return None
2021-08-06 14:38:28 +00:00
return TestSuite(args, suite_path, suite_tmp_path, suite)
2021-08-06 14:38:28 +00:00
stop_time = None
exit_code = None
server_died = None
stop_tests_triggered_lock = None
stop_tests_triggered = None
queue = None
multiprocessing_manager = None
restarted_tests = None
2022-04-27 11:17:54 +00:00
2022-04-28 11:26:49 +00:00
def run_tests_array(all_tests_with_params: Tuple[List[str], int, TestSuite]):
all_tests, num_tests, test_suite = all_tests_with_params
2020-08-26 17:44:03 +00:00
global stop_time
global exit_code
global server_died
global restarted_tests
2022-04-27 11:02:45 +00:00
OP_SQUARE_BRACKET = colored("[", args, attrs=["bold"])
CL_SQUARE_BRACKET = colored("]", args, attrs=["bold"])
MSG_FAIL = (
OP_SQUARE_BRACKET
+ colored(" FAIL ", args, "red", attrs=["bold"])
+ CL_SQUARE_BRACKET
)
MSG_UNKNOWN = (
OP_SQUARE_BRACKET
+ colored(" UNKNOWN ", args, "yellow", attrs=["bold"])
+ CL_SQUARE_BRACKET
)
MSG_OK = (
OP_SQUARE_BRACKET
+ colored(" OK ", args, "green", attrs=["bold"])
+ CL_SQUARE_BRACKET
)
MSG_SKIPPED = (
OP_SQUARE_BRACKET
+ colored(" SKIPPED ", args, "cyan", attrs=["bold"])
+ CL_SQUARE_BRACKET
)
MESSAGES = {
TestStatus.FAIL: MSG_FAIL,
TestStatus.UNKNOWN: MSG_UNKNOWN,
TestStatus.OK: MSG_OK,
TestStatus.SKIPPED: MSG_SKIPPED,
}
passed_total = 0
skipped_total = 0
failures_total = 0
failures_chain = 0
2021-03-29 18:14:06 +00:00
start_time = datetime.now()
2021-03-29 18:19:13 +00:00
is_concurrent = multiprocessing.current_process().name != "MainProcess"
client_options = get_additional_client_options(args)
2021-05-20 16:02:46 +00:00
if num_tests > 0:
2022-04-27 11:02:45 +00:00
about = "about " if is_concurrent else ""
proc_name = multiprocessing.current_process().name
print(f"\nRunning {about}{num_tests} {test_suite.suite} tests ({proc_name}).\n")
2021-05-20 16:02:46 +00:00
while True:
2021-05-20 16:44:35 +00:00
if is_concurrent:
2021-10-11 13:40:12 +00:00
case = queue.get(timeout=args.timeout * 1.1)
2021-05-20 16:44:35 +00:00
if not case:
2021-05-20 16:02:46 +00:00
break
else:
2021-05-20 16:44:35 +00:00
if all_tests:
case = all_tests.pop(0)
2021-05-20 16:02:46 +00:00
else:
break
2019-04-09 13:17:36 +00:00
if server_died.is_set():
stop_tests()
break
2019-04-09 13:17:36 +00:00
2020-08-26 17:44:03 +00:00
if stop_time and time() > stop_time:
print("\nStop tests run because global time limit is exceeded.\n")
stop_tests()
2020-08-26 17:44:03 +00:00
break
test_case = TestCase(test_suite, case, args, is_concurrent)
try:
2022-04-27 11:02:45 +00:00
description = ""
2022-04-28 11:26:49 +00:00
test_cace_name = removesuffix(test_case.name, ".gen", ".sql") + ": "
if not is_concurrent:
sys.stdout.flush()
2022-04-28 11:26:49 +00:00
sys.stdout.write(f"{test_cace_name:72}")
# This flush is needed so you can see the test name of the long
# running test before it will finish. But don't do it in parallel
# mode, so that the lines don't mix.
sys.stdout.flush()
else:
2022-04-28 11:26:49 +00:00
description = f"{test_cace_name:72}"
2021-08-06 14:38:28 +00:00
while True:
2022-04-27 11:02:45 +00:00
test_result = test_case.run(
args, test_suite, client_options, server_logs_level
)
test_result = test_case.process_result(test_result, MESSAGES)
if not test_result.need_retry:
break
restarted_tests.append(test_result)
# First print the description, than invoke the check result logic
description += test_result.description
if description and not description.endswith("\n"):
description += "\n"
sys.stdout.write(description)
sys.stdout.flush()
if test_result.status == TestStatus.OK:
passed_total += 1
failures_chain = 0
elif test_result.status == TestStatus.FAIL:
failures_total += 1
failures_chain += 1
if test_result.reason == FailureReason.SERVER_DIED:
2021-08-06 14:38:28 +00:00
server_died.set()
stop_tests()
elif test_result.status == TestStatus.SKIPPED:
skipped_total += 1
2021-08-06 14:38:28 +00:00
except KeyboardInterrupt as e:
print(colored("Break tests execution", args, "red"))
stop_tests()
raise e
if failures_chain >= args.max_failures_chain:
stop_tests()
break
if failures_total > 0:
2022-04-27 11:02:45 +00:00
print(
colored(
f"\nHaving {failures_total} errors! {passed_total} tests passed."
2022-04-28 11:26:49 +00:00
f" {skipped_total} tests skipped."
f" {(datetime.now() - start_time).total_seconds():.2f} s elapsed"
2022-04-27 11:02:45 +00:00
f" ({multiprocessing.current_process().name}).",
args,
"red",
attrs=["bold"],
)
)
exit_code.value = 1
else:
2022-04-27 11:02:45 +00:00
print(
colored(
f"\n{passed_total} tests passed. {skipped_total} tests skipped."
f" {(datetime.now() - start_time).total_seconds():.2f} s elapsed"
f" ({multiprocessing.current_process().name}).",
args,
"green",
attrs=["bold"],
)
)
2019-04-09 13:17:36 +00:00
2021-03-29 22:41:07 +00:00
sys.stdout.flush()
server_logs_level = "warning"
def check_server_started(args):
2022-04-27 11:02:45 +00:00
print("Connecting to ClickHouse server...", end="")
sys.stdout.flush()
retry_count = args.server_check_retries
while retry_count > 0:
try:
clickhouse_execute(args, "SELECT 1", max_http_retries=1)
print(" OK")
sys.stdout.flush()
return True
2023-01-10 19:19:58 +00:00
except (ConnectionError, http.client.ImproperConnectionState) as e:
if args.hung_check:
print("Connection error, will retry: ", str(e))
else:
print(".", end="")
sys.stdout.flush()
retry_count -= 1
sleep(0.5)
continue
2022-04-12 09:55:57 +00:00
except TimeoutError:
print("\nConnection timeout, will not retry")
break
2022-04-13 11:37:44 +00:00
except Exception as e:
print(
"\nUexpected exception, will not retry: ",
type(e).__name__,
": ",
str(e),
)
2022-04-13 11:37:44 +00:00
break
2022-04-27 11:02:45 +00:00
print("\nAll connection tries failed")
sys.stdout.flush()
return False
2022-04-27 11:02:45 +00:00
class BuildFlags:
THREAD = "tsan"
ADDRESS = "asan"
UNDEFINED = "ubsan"
MEMORY = "msan"
DEBUG = "debug"
RELEASE = "release"
ORDINARY_DATABASE = "ordinary-database"
POLYMORPHIC_PARTS = "polymorphic-parts"
2020-07-03 10:57:16 +00:00
def collect_build_flags(args):
result = []
2020-10-07 18:53:34 +00:00
2022-04-27 11:02:45 +00:00
value = clickhouse_execute(
args, "SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'"
)
if b"-fsanitize=thread" in value:
result.append(BuildFlags.THREAD)
2022-04-27 11:02:45 +00:00
elif b"-fsanitize=address" in value:
result.append(BuildFlags.ADDRESS)
2022-04-27 11:02:45 +00:00
elif b"-fsanitize=undefined" in value:
result.append(BuildFlags.UNDEFINED)
2022-04-27 11:02:45 +00:00
elif b"-fsanitize=memory" in value:
result.append(BuildFlags.MEMORY)
2022-04-27 11:02:45 +00:00
value = clickhouse_execute(
args, "SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'"
)
if b"Debug" in value:
result.append(BuildFlags.DEBUG)
2022-04-27 11:02:45 +00:00
elif b"RelWithDebInfo" in value or b"Release" in value:
result.append(BuildFlags.RELEASE)
2022-04-27 11:02:45 +00:00
value = clickhouse_execute(
2022-07-08 19:27:16 +00:00
args,
"SELECT value FROM system.settings WHERE name = 'allow_deprecated_database_ordinary'",
2022-04-27 11:02:45 +00:00
)
2022-08-12 13:40:35 +00:00
if value == b"1" or args.db_engine == "Ordinary":
result.append(BuildFlags.ORDINARY_DATABASE)
2022-04-27 11:02:45 +00:00
value = int(
clickhouse_execute(
args,
"SELECT value FROM system.merge_tree_settings WHERE name = 'min_bytes_for_wide_part'",
)
)
if value == 0:
result.append(BuildFlags.POLYMORPHIC_PARTS)
2020-10-07 18:53:34 +00:00
2022-04-27 11:02:45 +00:00
use_flags = clickhouse_execute(
args,
"SELECT name FROM system.build_options WHERE name like 'USE_%' AND value in ('ON', '1')",
)
for use_flag in use_flags.strip().splitlines():
use_flag = use_flag.decode().lower()
result.append(use_flag)
2022-04-27 11:02:45 +00:00
system_processor = clickhouse_execute(
args,
"SELECT value FROM system.build_options WHERE name = 'SYSTEM_PROCESSOR' LIMIT 1",
).strip()
if system_processor:
2022-04-27 11:02:45 +00:00
result.append(f"cpu-{system_processor.decode().lower()}")
2020-07-03 10:57:16 +00:00
return result
2022-04-27 11:02:45 +00:00
2022-07-07 22:16:01 +00:00
def collect_changed_merge_tree_settings(args):
changed_settings = (
clickhouse_execute(
args,
"SELECT name FROM system.merge_tree_settings WHERE changed",
)
.strip()
.splitlines()
)
return list(map(lambda s: s.decode(), changed_settings))
def check_table_column(args, database, table, column):
2022-04-27 11:02:45 +00:00
return (
int(
clickhouse_execute(
args,
f"""
SELECT count()
FROM system.columns
WHERE database = '{database}' AND table = '{table}' AND name = '{column}'
2022-04-27 11:02:45 +00:00
""",
)
)
> 0
)
2020-07-03 10:57:16 +00:00
2022-04-28 11:26:49 +00:00
def suite_key_func(item: str) -> Union[float, Tuple[int, str]]:
2022-04-27 11:02:45 +00:00
if args.order == "random":
return random.random()
2022-04-27 11:02:45 +00:00
if -1 == item.find("_"):
return 99998, ""
2022-04-27 11:02:45 +00:00
prefix, suffix = item.split("_", 1)
try:
return int(prefix), suffix
except ValueError:
2022-04-27 11:02:45 +00:00
return 99997, ""
def extract_key(key: str) -> str:
return subprocess.getstatusoutput(
2022-04-27 11:02:45 +00:00
args.extract_from_config + " --try --config " + args.configserver + key
)[1]
def do_run_tests(jobs, test_suite: TestSuite, parallel):
if jobs > 1 and len(test_suite.parallel_tests) > 0:
2022-04-27 11:02:45 +00:00
print(
"Found",
len(test_suite.parallel_tests),
"parallel tests and",
len(test_suite.sequential_tests),
"sequential tests",
)
run_n, run_total = parallel.split("/")
2021-05-20 18:11:12 +00:00
run_n = float(run_n)
run_total = float(run_total)
tests_n = len(test_suite.parallel_tests)
2022-04-28 11:26:49 +00:00
run_total = min(run_total, tests_n)
2021-05-20 18:11:12 +00:00
2022-04-28 11:26:49 +00:00
jobs = min(jobs, tests_n)
run_total = max(jobs, run_total)
2021-05-20 18:11:12 +00:00
batch_size = max(1, len(test_suite.parallel_tests) // jobs)
2021-05-20 18:11:12 +00:00
parallel_tests_array = []
for _ in range(jobs):
parallel_tests_array.append((None, batch_size, test_suite))
2021-05-20 18:11:12 +00:00
2022-06-27 20:54:52 +00:00
try:
with closing(multiprocessing.Pool(processes=jobs)) as pool:
pool.map_async(run_tests_array, parallel_tests_array)
2021-05-20 18:11:12 +00:00
2022-06-27 20:54:52 +00:00
for suit in test_suite.parallel_tests:
queue.put(suit, timeout=args.timeout * 1.1)
2021-05-20 18:11:12 +00:00
2022-06-27 20:54:52 +00:00
for _ in range(jobs):
queue.put(None, timeout=args.timeout * 1.1)
2021-05-20 18:11:12 +00:00
2022-06-27 20:54:52 +00:00
queue.close()
except Full:
2022-07-08 19:27:16 +00:00
print(
"Couldn't put test to the queue within timeout. Server probably hung."
)
print_stacktraces()
2021-05-20 18:11:12 +00:00
queue.close()
pool.join()
2022-04-27 11:02:45 +00:00
run_tests_array(
(test_suite.sequential_tests, len(test_suite.sequential_tests), test_suite)
)
return len(test_suite.sequential_tests) + len(test_suite.parallel_tests)
2021-05-20 18:11:12 +00:00
else:
num_tests = len(test_suite.all_tests)
run_tests_array((test_suite.all_tests, num_tests, test_suite))
2021-05-20 19:57:06 +00:00
return num_tests
2021-05-20 18:11:12 +00:00
def is_test_from_dir(suite_dir, case):
case_file = os.path.join(suite_dir, case)
# We could also test for executable files (os.access(case_file, os.X_OK),
# but it interferes with 01610_client_spawn_editor.editor, which is invoked
# as a query editor in the test, and must be marked as executable.
2022-04-27 11:02:45 +00:00
return os.path.isfile(case_file) and any(
case_file.endswith(suppotred_ext) for suppotred_ext in TEST_FILE_EXTENSIONS
)
def removesuffix(text, *suffixes):
"""
Added in python 3.9
https://www.python.org/dev/peps/pep-0616/
This version can work with several possible suffixes
"""
for suffix in suffixes:
if suffix and text.endswith(suffix):
2022-04-27 11:02:45 +00:00
return text[: -len(suffix)]
return text
def reportCoverageFor(args, what, query, permissive=False):
2022-08-27 01:13:53 +00:00
value = clickhouse_execute(args, query).decode()
2022-08-26 21:57:14 +00:00
if value != "":
print(f"\nThe following {what} were not covered by tests:\n")
2022-08-27 01:13:53 +00:00
print(value)
2022-08-26 21:57:14 +00:00
print("\n")
2022-08-27 16:11:16 +00:00
return permissive
return True
2022-08-26 21:57:14 +00:00
def reportCoverage(args):
clickhouse_execute(args, "SYSTEM FLUSH LOGS")
return (
reportCoverageFor(
args,
"functions",
"""
2022-08-26 21:57:14 +00:00
SELECT name
FROM system.functions
WHERE NOT is_aggregate AND origin = 'System' AND alias_to = ''
AND name NOT IN
(
SELECT arrayJoin(used_functions) FROM system.query_log WHERE event_date >= yesterday()
)
ORDER BY name
2022-08-27 16:11:16 +00:00
""",
True,
)
and reportCoverageFor(
args,
"aggregate functions",
"""
2022-08-26 21:57:14 +00:00
SELECT name
FROM system.functions
WHERE is_aggregate AND origin = 'System' AND alias_to = ''
AND name NOT IN
(
SELECT arrayJoin(used_aggregate_functions) FROM system.query_log WHERE event_date >= yesterday()
)
ORDER BY name
""",
)
and reportCoverageFor(
args,
"aggregate function combinators",
"""
2022-08-26 21:57:14 +00:00
SELECT name
FROM system.aggregate_function_combinators
WHERE NOT is_internal
AND name NOT IN
(
SELECT arrayJoin(used_aggregate_function_combinators) FROM system.query_log WHERE event_date >= yesterday()
)
ORDER BY name
""",
)
and reportCoverageFor(
args,
"data type families",
"""
2022-08-26 21:57:14 +00:00
SELECT name
FROM system.data_type_families
2022-08-27 01:13:53 +00:00
WHERE alias_to = '' AND name NOT LIKE 'Interval%'
2022-08-26 21:57:14 +00:00
AND name NOT IN
(
SELECT arrayJoin(used_data_type_families) FROM system.query_log WHERE event_date >= yesterday()
)
ORDER BY name
""",
)
2022-08-26 21:57:14 +00:00
)
2023-01-13 19:34:31 +00:00
def reportLogStats(args):
clickhouse_execute(args, "SYSTEM FLUSH LOGS")
2022-12-23 14:48:26 +00:00
query = """
WITH
120 AS mins,
(
SELECT (count(), sum(length(message)))
FROM system.text_log
WHERE (now() - toIntervalMinute(mins)) < event_time
) AS total
SELECT
count() AS count,
round(count / (total.1), 3) AS `count_%`,
formatReadableSize(sum(length(message))) AS size,
round(sum(length(message)) / (total.2), 3) AS `size_%`,
countDistinct(logger_name) AS uniq_loggers,
countDistinct(thread_id) AS uniq_threads,
groupArrayDistinct(toString(level)) AS levels,
round(sum(query_id = '') / count, 3) AS `background_%`,
message_format_string
FROM system.text_log
WHERE (now() - toIntervalMinute(mins)) < event_time
GROUP BY message_format_string
ORDER BY count DESC
LIMIT 100
FORMAT TSVWithNamesAndTypes
"""
value = clickhouse_execute(args, query).decode(errors="replace")
2022-12-23 19:40:19 +00:00
print("\nTop patterns of log messages:\n")
2022-12-23 14:48:26 +00:00
print(value)
print("\n")
query = """
WITH
120 AS mins
SELECT
count() AS count,
substr(replaceRegexpAll(message, '[^A-Za-z]+', ''), 1, 32) AS pattern,
2023-01-17 00:19:44 +00:00
substr(any(message), 1, 256) as runtime_message,
any((extract(source_file, '\/[a-zA-Z0-9_]+\.[a-z]+'), source_line)) as line
2022-12-23 14:48:26 +00:00
FROM system.text_log
WHERE (now() - toIntervalMinute(mins)) < event_time AND message_format_string = ''
GROUP BY pattern
ORDER BY count DESC
2023-01-25 20:16:42 +00:00
LIMIT 30
2022-12-23 14:48:26 +00:00
FORMAT TSVWithNamesAndTypes
"""
value = clickhouse_execute(args, query).decode(errors="replace")
2023-01-13 19:34:31 +00:00
print("\nTop messages without format string (fmt::runtime):\n")
2022-12-23 14:48:26 +00:00
print(value)
print("\n")
2023-01-25 11:53:00 +00:00
query = """
2023-01-25 20:16:42 +00:00
SELECT message_format_string, count(), substr(any(message), 1, 120) AS any_message
2023-01-25 11:53:00 +00:00
FROM system.text_log
WHERE (now() - toIntervalMinute(120)) < event_time
AND (message NOT LIKE (replaceRegexpAll(message_format_string, '{[:.0-9dfx]*}', '%') AS s))
AND (message NOT LIKE concat('%Exception: ', s, '%'))
2023-01-25 20:16:42 +00:00
GROUP BY message_format_string ORDER BY count() DESC LIMIT 20 FORMAT TSVWithNamesAndTypes
2023-01-25 11:53:00 +00:00
"""
value = clickhouse_execute(args, query).decode(errors="replace")
print("\nTop messages that does not match its format string:\n")
print(value)
print("\n")
2023-01-25 16:33:34 +00:00
query = """
WITH ('', '({}) Keys: {}', '({}) {}', 'Aggregating', 'Became leader', 'Cleaning queue', 'Creating set.',
'Cyclic aliases', 'Detaching {}', 'Executing {}', 'Fire events: {}', 'Found part {}', 'Loaded queue',
2023-01-25 20:16:42 +00:00
'No sharding key', 'No tables', 'Query: {}', 'Removed', 'Removed part {}', 'Removing parts.',
'Request URI: {}', 'Sending part {}', 'Sent handshake', 'Starting {}', 'Will mimic {}', 'Writing to {}',
'dropIfEmpty', 'loadAll {}', '{} ({}:{})', '{} -> {}', '{} {}', '{}: {}', 'Query was cancelled',
2023-04-01 16:23:59 +00:00
'Table {} already exists.', '{}%', 'Cancelled merging parts', 'All replicas are lost',
'Cancelled mutating parts', 'Read object: {}', 'New segment: {}', 'Unknown geometry type {}',
'Table {} is not replicated', '{} {}.{} already exists', 'Attempt to read after eof',
'Replica {} already exists', 'Convert overflow', 'key must be a tuple', 'Division by zero',
'No part {} in committed state', 'Files set to {}', 'Bytes set to {}', 'Sharding key {} is not used',
'Cannot parse datetime', 'Bad get: has {}, requested {}', 'There is no {} in {}', 'Numeric overflow',
'Polygon is not valid: {}', 'Decimal math overflow', '{} only accepts maps', 'Dictionary ({}) not found',
'Unknown format {}', 'Invalid IPv4 value', 'Invalid IPv6 value', 'Unknown setting {}',
2023-04-01 16:23:59 +00:00
'Unknown table function {}', 'Database {} already exists.', 'Table {} doesn''t exist',
'Invalid credentials', 'Part {} already exists', 'Invalid mode: {}', 'Log pulling is cancelled',
'JOIN {} cannot get JOIN keys', 'Unknown function {}{}', 'Cannot parse IPv6 {}',
'Not found address of host: {}', '{} must contain a tuple', 'Unknown codec family: {}',
'Expected const String column', 'Invalid partition format: {}', 'Cannot parse IPv4 {}',
'AST is too deep. Maximum: {}', 'Array sizes are too large: {}', 'Unable to connect to HDFS: {}',
'Shutdown is called for table', 'File is not inside {}',
'Table {} doesn''t exist', 'Database {} doesn''t exist', 'Table {}.{} doesn''t exist',
'File {} doesn''t exist', 'No such attribute ''{}''', 'User name ''{}'' is reserved'
2023-01-25 16:33:34 +00:00
) AS known_short_messages
2023-01-25 20:16:42 +00:00
SELECT count() AS c, message_format_string, substr(any(message), 1, 120)
2023-01-25 16:33:34 +00:00
FROM system.text_log
WHERE (now() - toIntervalMinute(120)) < event_time
AND (length(message_format_string) < 16
2023-01-26 09:52:47 +00:00
OR (length(message_format_string) < 30 AND message ilike '%DB::Exception%'))
2023-01-25 16:33:34 +00:00
AND message_format_string NOT IN known_short_messages
2023-01-25 20:16:42 +00:00
GROUP BY message_format_string ORDER BY c DESC LIMIT 30 FORMAT TSVWithNamesAndTypes
2023-01-25 16:33:34 +00:00
"""
value = clickhouse_execute(args, query).decode(errors="replace")
print("\nTop short messages:\n")
print(value)
print("\n")
2023-01-26 09:52:47 +00:00
query = """
SELECT max((freq, message_format_string)), level
FROM (SELECT count() / (SELECT count() FROM system.text_log
WHERE (now() - toIntervalMinute(120)) < event_time) AS freq,
min(level) AS level, message_format_string FROM system.text_log
WHERE (now() - toIntervalMinute(120)) < event_time
GROUP BY message_format_string ORDER BY freq DESC)
GROUP BY level
"""
value = clickhouse_execute(args, query).decode(errors="replace")
print("\nTop messages by level:\n")
print(value)
print("\n")
2022-08-26 21:57:14 +00:00
def main(args):
global server_died
2020-08-26 17:44:03 +00:00
global stop_time
global exit_code
global server_logs_level
global restarted_tests
if not check_server_started(args):
2023-01-02 01:06:11 +00:00
msg = "Server is not responding. Cannot execute 'SELECT 1' query."
2021-09-18 22:05:17 +00:00
if args.hung_check:
print(msg)
pid = get_server_pid()
print("Got server pid", pid)
print_stacktraces()
raise Exception(msg)
2020-10-12 11:17:35 +00:00
args.build_flags = collect_build_flags(args)
2022-07-07 22:16:01 +00:00
args.changed_merge_tree_settings = collect_changed_merge_tree_settings(args)
2022-04-27 11:02:45 +00:00
args.suppport_system_processes_is_all_data_sent = check_table_column(
args, "system", "processes", "is_all_data_sent"
)
2020-07-03 11:15:30 +00:00
if args.s3_storage and (
BuildFlags.THREAD in args.build_flags or BuildFlags.DEBUG in args.build_flags
):
2022-09-20 10:25:51 +00:00
args.no_random_settings = True
2020-07-03 10:57:16 +00:00
if args.skip:
args.skip = set(args.skip)
base_dir = os.path.abspath(args.queries)
# Keep same default values as in queries/shell_config.sh
os.environ.setdefault("CLICKHOUSE_BINARY", args.binary)
2021-08-06 14:38:28 +00:00
# os.environ.setdefault("CLICKHOUSE_CLIENT", args.client)
os.environ.setdefault("CLICKHOUSE_CONFIG", args.configserver)
2018-06-18 21:13:11 +00:00
if args.configclient:
os.environ.setdefault("CLICKHOUSE_CONFIG_CLIENT", args.configclient)
# Force to print server warnings in stderr
# Shell scripts could change logging level
os.environ.setdefault("CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL", server_logs_level)
2020-08-26 17:44:03 +00:00
# This code is bad as the time is not monotonic
if args.global_time_limit:
stop_time = time() + args.global_time_limit
if args.zookeeper is None:
args.zookeeper = True
if args.shard is None:
args.shard = bool(extract_key(' --key listen_host | grep -E "127.0.0.2|::"'))
2021-06-22 11:50:09 +00:00
def create_common_database(args, db_name):
create_database_retries = 0
while create_database_retries < MAX_RETRIES:
start_time = datetime.now()
try:
2022-04-27 11:02:45 +00:00
clickhouse_execute(
args,
2022-04-28 11:26:49 +00:00
f"CREATE DATABASE IF NOT EXISTS {db_name} "
f"{get_db_engine(args, db_name)}",
2022-08-12 09:28:16 +00:00
settings=get_create_database_settings(args, None),
2022-04-27 11:02:45 +00:00
)
except HTTPError as e:
total_time = (datetime.now() - start_time).total_seconds()
if not need_retry(args, e.message, e.message, total_time):
break
2021-06-22 11:50:09 +00:00
create_database_retries += 1
2022-04-28 14:29:53 +00:00
try:
if args.database and args.database != "test":
create_common_database(args, args.database)
2022-04-28 14:29:53 +00:00
create_common_database(args, "test")
except Exception as e:
print(f"Failed to create databases for tests: {e}")
server_died.set()
2019-10-09 10:51:05 +00:00
total_tests_run = 0
for suite in sorted(os.listdir(base_dir), key=suite_key_func):
if server_died.is_set():
break
test_suite = TestSuite.read_test_suite(args, suite)
if test_suite is None:
continue
total_tests_run += do_run_tests(args.jobs, test_suite, args.parallel)
2019-10-09 10:51:05 +00:00
if server_died.is_set():
exit_code.value = 1
if args.hung_check:
# Some queries may execute in background for some time after test was finished. This is normal.
for _ in range(1, 60):
processlist = get_processlist_with_stacktraces(args)
if not processlist:
break
sleep(1)
if processlist:
2022-04-27 11:02:45 +00:00
print(
colored(
"\nFound hung queries in processlist:", args, "red", attrs=["bold"]
)
)
print(json.dumps(processlist, indent=4))
2022-03-14 20:43:34 +00:00
print(get_transactions_list(args))
2021-02-19 14:38:20 +00:00
2023-02-23 17:20:29 +00:00
print_stacktraces()
exit_code.value = 1
else:
print(colored("\nNo queries hung.", args, "green", attrs=["bold"]))
2021-06-16 10:26:04 +00:00
if len(restarted_tests) > 0:
2021-06-15 20:52:29 +00:00
print("\nSome tests were restarted:\n")
for test_result in restarted_tests:
2022-04-28 11:26:49 +00:00
print(f"\n{test_result.case_name:72}: ")
# replace it with lowercase to avoid parsing retried tests as failed
for status in TestStatus:
2022-04-27 11:02:45 +00:00
test_result.description = test_result.description.replace(
status.value, status.value.lower()
)
print(test_result.description)
2021-06-15 20:52:29 +00:00
2019-10-09 10:51:05 +00:00
if total_tests_run == 0:
print("No tests were run.")
sys.exit(1)
2021-08-06 14:38:28 +00:00
else:
print("All tests have finished.")
2019-10-09 10:51:05 +00:00
2022-12-23 14:48:26 +00:00
if args.report_logs_stats:
2023-01-25 15:06:40 +00:00
try:
reportLogStats(args)
except Exception as e:
print(f"Failed to get stats about log messages: {e}")
2022-12-23 14:48:26 +00:00
2022-08-27 16:11:16 +00:00
if args.report_coverage and not reportCoverage(args):
exit_code.value = 1
sys.exit(exit_code.value)
2019-01-24 11:02:55 +00:00
def find_binary(name):
if os.access(name, os.X_OK):
return name
2022-04-27 11:02:45 +00:00
paths = os.environ.get("PATH").split(":")
2019-01-24 11:02:55 +00:00
for path in paths:
bin_path = os.path.join(path, name)
if os.access(bin_path, os.X_OK):
return bin_path
2019-01-24 11:02:55 +00:00
# maybe it wasn't in PATH
bin_path = os.path.join("/usr/local/bin", name)
if os.access(bin_path, os.X_OK):
return bin_path
bin_path = os.path.join("/usr/bin", name)
if os.access(bin_path, os.X_OK):
return bin_path
2019-06-17 16:50:31 +00:00
raise Exception(f"{name} was not found in PATH")
2019-06-17 16:50:31 +00:00
2023-03-23 15:33:23 +00:00
def find_clickhouse_command(binary, command):
symlink = binary + "-" + command
if os.access(symlink, os.X_OK):
return symlink
# To avoid requiring symlinks (in case you download binary from CI)
return binary + " " + command
2023-03-23 15:33:23 +00:00
def get_additional_client_options(args):
if args.client_option:
2022-04-27 11:02:45 +00:00
return " ".join("--" + option for option in args.client_option)
return ""
def get_additional_client_options_url(args):
if args.client_option:
2022-04-27 11:02:45 +00:00
return "&".join(args.client_option)
return ""
def parse_args():
2022-04-27 11:02:45 +00:00
parser = ArgumentParser(description="ClickHouse functional tests")
parser.add_argument("-q", "--queries", help="Path to queries dir")
parser.add_argument("--tmp", help="Path to tmp dir")
parser.add_argument(
"-b",
"--binary",
default="clickhouse",
type=find_binary,
help="Path to clickhouse binary or name of binary in PATH",
2022-04-27 11:02:45 +00:00
)
parser.add_argument(
"-c",
"--client",
2023-01-02 01:06:11 +00:00
help="Path to clickhouse-client, this option is useless"
2022-04-28 11:26:49 +00:00
"name of binary in PATH",
2022-04-27 11:02:45 +00:00
)
parser.add_argument("--extract_from_config", help="extract-from-config program")
parser.add_argument(
"--configclient", help="Client config (if you use not default ports)"
)
parser.add_argument(
"--configserver",
default="/etc/clickhouse-server/config.xml",
help="Preprocessed server config",
)
parser.add_argument(
"-o", "--output", help="Output xUnit compliant test report directory"
)
parser.add_argument(
"-t",
"--timeout",
type=int,
default=600,
help="Timeout for each test case in seconds",
)
parser.add_argument(
"--global_time_limit",
type=int,
help="Stop if executing more than specified time (after current test finished)",
)
parser.add_argument("test", nargs="*", help="Optional test case name regex")
parser.add_argument(
"-d",
"--disabled",
action="store_true",
default=False,
help="Also run disabled tests",
)
parser.add_argument(
"--stop",
action="store_true",
default=None,
dest="stop",
help="Stop on network errors",
)
parser.add_argument(
"--order", default="desc", choices=["asc", "desc", "random"], help="Run order"
)
parser.add_argument(
"--testname",
action="store_true",
default=None,
dest="testname",
help="Make query with test name before test run",
)
parser.add_argument("--hung-check", action="store_true", default=False)
parser.add_argument("--no-left-queries-check", action="store_true", default=False)
parser.add_argument("--force-color", action="store_true", default=False)
parser.add_argument(
"--database", help="Database for tests (random name test_XXXXXX by default)"
)
parser.add_argument(
"--no-drop-if-fail",
action="store_true",
2022-10-19 15:24:50 +00:00
help="Do not drop database for test if test has failed (does not work if reference file mismatch)",
2022-04-27 11:02:45 +00:00
)
parser.add_argument(
"--hide-db-name",
action="store_true",
help='Replace random database name with "default" in stderr',
)
parser.add_argument(
"--parallel", default="1/1", help="One parallel test run number/total"
)
parser.add_argument(
"-j", "--jobs", default=1, nargs="?", type=int, help="Run all tests in parallel"
)
parser.add_argument(
"--test-runs",
default=1,
nargs="?",
type=int,
help="Run each test many times (useful for e.g. flaky check)",
)
parser.add_argument(
"-U",
"--unified",
default=3,
type=int,
help="output NUM lines of unified context",
)
parser.add_argument(
"-r",
"--server-check-retries",
default=180,
type=int,
help="Num of tries to execute SELECT 1 before tests started",
)
parser.add_argument("--db-engine", help="Database engine name")
parser.add_argument(
"--replicated-database",
action="store_true",
default=False,
help="Run tests with Replicated database engine",
)
parser.add_argument(
"--fast-tests-only",
action="store_true",
default=False,
help='Run only fast tests (the tests without the "no-fasttest" tag)',
)
parser.add_argument(
"--no-stateless", action="store_true", help="Disable all stateless tests"
)
parser.add_argument(
"--no-stateful", action="store_true", help="Disable all stateful tests"
)
parser.add_argument("--skip", nargs="+", help="Skip these tests")
parser.add_argument(
"--sequential",
nargs="+",
help="Run these tests sequentially even if --parallel specified",
)
parser.add_argument(
"--no-long", action="store_true", dest="no_long", help="Do not run long tests"
)
parser.add_argument(
"--client-option", nargs="+", help="Specify additional client argument"
)
parser.add_argument(
"--print-time", action="store_true", dest="print_time", help="Print test time"
)
parser.add_argument(
"--check-zookeeper-session",
action="store_true",
help="Check ZooKeeper session uptime to determine if failed test should be retried",
)
parser.add_argument(
"--s3-storage",
action="store_true",
default=False,
help="Run tests over s3 storage",
)
parser.add_argument(
"--no-random-settings",
action="store_true",
default=False,
help="Disable settings randomization",
)
parser.add_argument(
"--no-random-merge-tree-settings",
action="store_true",
default=False,
help="Disable MergeTree settings randomization",
)
2022-04-27 11:02:45 +00:00
parser.add_argument(
"--run-by-hash-num",
type=int,
help="Run tests matching crc32(test_name) % run_by_hash_total == run_by_hash_num",
)
parser.add_argument(
"--run-by-hash-total",
type=int,
help="Total test groups for crc32(test_name) % run_by_hash_total == run_by_hash_num",
)
2021-08-06 14:38:28 +00:00
group = parser.add_mutually_exclusive_group(required=False)
2022-04-27 11:02:45 +00:00
group.add_argument(
"--zookeeper",
action="store_true",
default=None,
dest="zookeeper",
help="Run zookeeper related tests",
)
group.add_argument(
"--no-zookeeper",
action="store_false",
default=None,
dest="zookeeper",
help="Do not run zookeeper related tests",
)
2021-08-06 14:38:28 +00:00
group = parser.add_mutually_exclusive_group(required=False)
2022-04-27 11:02:45 +00:00
group.add_argument(
"--shard",
action="store_true",
default=None,
dest="shard",
2022-04-28 11:26:49 +00:00
help="Run sharding related tests "
"(required to clickhouse-server listen 127.0.0.2 127.0.0.3)",
2022-04-27 11:02:45 +00:00
)
group.add_argument(
"--no-shard",
action="store_false",
default=None,
dest="shard",
help="Do not run shard related tests",
)
group.add_argument(
"--upgrade-check",
2022-04-27 11:02:45 +00:00
action="store_true",
help="Run tests for further server upgrade testing by ignoring all"
2022-04-27 11:02:45 +00:00
"drop queries in tests for collecting data from new version of server",
)
parser.add_argument(
"--secure",
action="store_true",
default=False,
help="Use secure connection to connect to clickhouse-server",
)
parser.add_argument(
"--max-failures-chain",
default=20,
type=int,
help="Max number of failed tests in a row (stop tests if higher)",
)
parser.add_argument(
"--report-coverage",
action="store_true",
default=False,
help="Check what high-level server components were covered by tests",
)
2022-12-23 14:48:26 +00:00
parser.add_argument(
"--report-logs-stats",
action="store_true",
default=False,
help="Report statistics about log messages",
)
2023-02-03 13:34:18 +00:00
parser.add_argument(
"--no-parallel-replicas",
action="store_true",
default=False,
help="Do not include tests that are not supported with parallel replicas feature",
)
2023-02-15 21:19:41 +00:00
return parser.parse_args()
if __name__ == "__main__":
stop_time = None
exit_code = multiprocessing.Value("i", 0)
server_died = multiprocessing.Event()
stop_tests_triggered_lock = multiprocessing.Lock()
stop_tests_triggered = multiprocessing.Event()
queue = multiprocessing.Queue(maxsize=1)
multiprocessing_manager = multiprocessing.Manager()
restarted_tests = multiprocessing_manager.list()
# Move to a new process group and kill it at exit so that we don't have any
# infinite tests processes left
# (new process group is required to avoid killing some parent processes)
os.setpgid(0, 0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGHUP, signal_handler)
try:
args = parse_args()
except Exception as e:
print(e, file=sys.stderr)
sys.exit(1)
2020-08-13 18:45:55 +00:00
if args.queries and not os.path.isdir(args.queries):
2022-04-27 11:02:45 +00:00
print(
f"Cannot access the specified directory with queries ({args.queries})",
file=sys.stderr,
)
sys.exit(1)
2020-08-13 18:45:55 +00:00
# Autodetect the directory with queries if not specified
if args.queries is None:
2022-04-27 11:02:45 +00:00
args.queries = "queries"
2020-08-13 18:45:55 +00:00
if not os.path.isdir(args.queries):
# If we're running from the repo
2022-04-27 11:02:45 +00:00
args.queries = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "queries"
)
2020-08-13 18:45:55 +00:00
if not os.path.isdir(args.queries):
# Next we're going to try some system directories, don't write 'stdout' files into them.
if args.tmp is None:
2022-04-27 11:02:45 +00:00
args.tmp = "/tmp/clickhouse-test"
2020-08-13 18:45:55 +00:00
2022-04-27 11:02:45 +00:00
args.queries = "/usr/local/share/clickhouse-test/queries"
2020-08-13 18:45:55 +00:00
if not os.path.isdir(args.queries):
2022-04-27 11:02:45 +00:00
args.queries = "/usr/share/clickhouse-test/queries"
2020-08-13 18:45:55 +00:00
if not os.path.isdir(args.queries):
2022-04-27 11:02:45 +00:00
print(
2022-04-28 11:26:49 +00:00
"Failed to detect path to the queries directory. Please specify it with "
"'--queries' option.",
2022-04-27 11:02:45 +00:00
file=sys.stderr,
)
sys.exit(1)
2020-07-03 10:57:16 +00:00
2020-08-13 18:45:55 +00:00
print("Using queries from '" + args.queries + "' directory")
2018-01-16 20:17:31 +00:00
if args.tmp is None:
args.tmp = args.queries
2019-01-24 11:02:55 +00:00
2023-02-01 12:01:58 +00:00
if args.client:
print(
"WARNING: --client option is deprecated and will be removed the the future, use --binary instead",
file=sys.stderr,
)
args.client = find_clickhouse_command(args.binary, "client")
2023-02-01 12:01:58 +00:00
if args.extract_from_config:
print(
"WARNING: --extract_from_config option is deprecated and will be removed the the future",
file=sys.stderr,
)
2023-03-23 15:33:23 +00:00
args.extract_from_config = find_clickhouse_command(
args.binary, "extract-from-config"
)
2019-01-24 11:02:55 +00:00
if args.configclient:
2022-04-27 11:02:45 +00:00
args.client += " --config-file=" + args.configclient
tcp_host = os.getenv("CLICKHOUSE_HOST")
if tcp_host is not None:
args.tcp_host = tcp_host
2022-04-27 11:02:45 +00:00
args.client += f" --host={tcp_host}"
else:
2022-04-27 11:02:45 +00:00
args.tcp_host = "localhost"
tcp_port = os.getenv("CLICKHOUSE_PORT_TCP")
if tcp_port is not None:
args.tcp_port = int(tcp_port)
args.client += f" --port={tcp_port}"
else:
args.tcp_port = 9440 if args.secure else 9000
2022-05-09 12:55:51 +00:00
if args.secure:
os.environ["CLICKHOUSE_PORT_TCP"] = str(args.tcp_port)
http_port = os.getenv("CLICKHOUSE_PORT_HTTP")
if http_port is not None:
args.http_port = int(http_port)
else:
args.http_port = 8443 if args.secure else 8123
os.environ["CLICKHOUSE_PORT_HTTP"] = str(args.http_port)
if args.secure and os.getenv("CLICKHOUSE_PORT_HTTP_PROTO") is None:
os.environ["CLICKHOUSE_PORT_HTTP_PROTO"] = "https"
client_database = os.getenv("CLICKHOUSE_DATABASE")
if client_database is not None:
2022-04-27 11:02:45 +00:00
args.client += f" --database={client_database}"
args.client_database = client_database
else:
2022-04-27 11:02:45 +00:00
args.client_database = "default"
if args.upgrade_check:
2022-04-27 11:02:45 +00:00
args.client += " --fake-drop"
if args.client_option or args.secure:
# Set options for client
2022-04-27 11:02:45 +00:00
if "CLICKHOUSE_CLIENT_OPT" in os.environ:
os.environ["CLICKHOUSE_CLIENT_OPT"] += " "
else:
2022-04-27 11:02:45 +00:00
os.environ["CLICKHOUSE_CLIENT_OPT"] = ""
2022-04-27 11:02:45 +00:00
os.environ["CLICKHOUSE_CLIENT_OPT"] += get_additional_client_options(args)
if args.secure:
os.environ["CLICKHOUSE_CLIENT_OPT"] += " --secure "
# Set options for curl
2022-04-27 11:02:45 +00:00
if "CLICKHOUSE_URL_PARAMS" in os.environ:
os.environ["CLICKHOUSE_URL_PARAMS"] += "&"
else:
2022-04-27 11:02:45 +00:00
os.environ["CLICKHOUSE_URL_PARAMS"] = ""
client_options_query_str = get_additional_client_options_url(args)
2022-04-27 11:02:45 +00:00
args.client_options_query_str = client_options_query_str + "&"
os.environ["CLICKHOUSE_URL_PARAMS"] += client_options_query_str
else:
2022-04-27 11:02:45 +00:00
args.client_options_query_str = ""
2019-06-20 09:12:49 +00:00
if args.jobs is None:
args.jobs = multiprocessing.cpu_count()
2019-06-20 09:12:49 +00:00
if args.db_engine and args.db_engine == "Ordinary":
MESSAGES_TO_RETRY.append(" locking attempt on ")
main(args)