mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
275acb756a
Add missing SYSTEM FLUSH LOGS for log messages statistics
2639 lines
89 KiB
Python
Executable File
2639 lines
89 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
# pylint: disable=too-many-return-statements
|
|
# pylint: disable=global-variable-not-assigned
|
|
# pylint: disable=too-many-lines
|
|
# pylint: disable=anomalous-backslash-in-string
|
|
|
|
import enum
|
|
from queue import Full
|
|
import shutil
|
|
import sys
|
|
import os
|
|
import os.path
|
|
import signal
|
|
import re
|
|
import copy
|
|
import traceback
|
|
import math
|
|
|
|
# Not requests, to avoid requiring extra dependency.
|
|
import http.client
|
|
import urllib.parse
|
|
import json
|
|
|
|
# for crc32
|
|
import zlib
|
|
|
|
from argparse import ArgumentParser
|
|
from typing import Tuple, Union, Optional, Dict, Set, List
|
|
import subprocess
|
|
from subprocess import Popen
|
|
from subprocess import PIPE
|
|
from datetime import datetime
|
|
from time import time, sleep
|
|
from errno import ESRCH
|
|
|
|
try:
|
|
import termcolor # type: ignore
|
|
except ImportError:
|
|
termcolor = None
|
|
|
|
import random
|
|
import string
|
|
import multiprocessing
|
|
import socket
|
|
from contextlib import closing
|
|
|
|
USE_JINJA = True
|
|
try:
|
|
import jinja2
|
|
except ImportError:
|
|
USE_JINJA = False
|
|
print("WARNING: jinja2 not installed! Template tests will be skipped.")
|
|
|
|
MESSAGES_TO_RETRY = [
|
|
"ConnectionPoolWithFailover: Connection failed at try",
|
|
"DB::Exception: New table appeared in database being dropped or detached. Try again",
|
|
"is already started to be removing by another replica right now",
|
|
]
|
|
|
|
MAX_RETRIES = 3
|
|
|
|
TEST_FILE_EXTENSIONS = [".sql", ".sql.j2", ".sh", ".py", ".expect"]
|
|
|
|
VERSION_PATTERN = r"^((\d+\.)?(\d+\.)?(\d+\.)?\d+)$"
|
|
|
|
|
|
def stringhash(s):
|
|
# default hash() function consistent
|
|
# only during process invocation https://stackoverflow.com/a/42089311
|
|
return zlib.crc32(s.encode("utf-8"))
|
|
|
|
|
|
# First and last lines of the log
|
|
def trim_for_log(s):
|
|
if not s:
|
|
return s
|
|
lines = s.splitlines()
|
|
if len(lines) > 10000:
|
|
separator = "-" * 40 + str(len(lines) - 10000) + " lines are hidden" + "-" * 40
|
|
return "\n".join(lines[:5000] + [] + [separator] + [] + lines[-5000:])
|
|
else:
|
|
return "\n".join(lines)
|
|
|
|
|
|
class HTTPError(Exception):
|
|
def __init__(self, message=None, code=None):
|
|
self.message = message
|
|
self.code = code
|
|
super().__init__(message)
|
|
|
|
def __str__(self):
|
|
return f"Code: {self.code}. {self.message}"
|
|
|
|
|
|
# Helpers to execute queries via HTTP interface.
|
|
def clickhouse_execute_http(
|
|
base_args, query, timeout=30, settings=None, default_format=None, max_http_retries=5, retry_error_codes=False
|
|
):
|
|
if args.secure:
|
|
client = http.client.HTTPSConnection(
|
|
host=base_args.tcp_host, port=base_args.http_port, timeout=timeout
|
|
)
|
|
else:
|
|
client = http.client.HTTPConnection(
|
|
host=base_args.tcp_host, port=base_args.http_port, timeout=timeout
|
|
)
|
|
|
|
timeout = int(timeout)
|
|
params = {
|
|
"query": query,
|
|
# hung check in stress tests may remove the database,
|
|
# hence we should use 'system'.
|
|
"database": "system",
|
|
"connect_timeout": timeout,
|
|
"receive_timeout": timeout,
|
|
"send_timeout": timeout,
|
|
"http_connection_timeout": timeout,
|
|
"http_receive_timeout": timeout,
|
|
"http_send_timeout": timeout,
|
|
"output_format_parallel_formatting": 0,
|
|
}
|
|
if settings is not None:
|
|
params.update(settings)
|
|
if default_format is not None:
|
|
params["default_format"] = default_format
|
|
|
|
for i in range(max_http_retries):
|
|
try:
|
|
client.request(
|
|
"POST",
|
|
f"/?{base_args.client_options_query_str}{urllib.parse.urlencode(params)}",
|
|
)
|
|
res = client.getresponse()
|
|
data = res.read()
|
|
if res.status == 200 or (not retry_error_codes):
|
|
break
|
|
except Exception as ex:
|
|
if i == max_http_retries - 1:
|
|
raise ex
|
|
|
|
sleep(i + 1)
|
|
|
|
if res.status != 200:
|
|
raise HTTPError(data.decode(), res.status)
|
|
|
|
return data
|
|
|
|
def clickhouse_execute(base_args, query, timeout=30, settings=None, max_http_retries=5, retry_error_codes=False):
|
|
return clickhouse_execute_http(base_args, query, timeout, settings, max_http_retries=max_http_retries, retry_error_codes=retry_error_codes).strip()
|
|
|
|
|
|
def clickhouse_execute_json(base_args, query, timeout=60, settings=None, max_http_retries=5):
|
|
data = clickhouse_execute_http(base_args, query, timeout, settings, "JSONEachRow", max_http_retries=max_http_retries)
|
|
if not data:
|
|
return None
|
|
rows = []
|
|
for row in data.strip().splitlines():
|
|
rows.append(json.loads(row))
|
|
return rows
|
|
|
|
|
|
class Terminated(KeyboardInterrupt):
|
|
pass
|
|
|
|
|
|
def signal_handler(sig, frame):
|
|
raise Terminated(f"Terminated with {sig} signal")
|
|
|
|
|
|
def stop_tests():
|
|
global stop_tests_triggered_lock
|
|
global stop_tests_triggered
|
|
global restarted_tests
|
|
|
|
with stop_tests_triggered_lock:
|
|
print("Stopping tests")
|
|
if not stop_tests_triggered.is_set():
|
|
stop_tests_triggered.set()
|
|
|
|
# materialize multiprocessing.Manager().list() object before
|
|
# sending SIGTERM since this object is a proxy, that requires
|
|
# communicating with manager thread, but after SIGTERM will be
|
|
# send, this thread will die, and you will get
|
|
# ConnectionRefusedError error for any access to "restarted_tests"
|
|
# variable.
|
|
restarted_tests = [*restarted_tests]
|
|
|
|
# send signal to all processes in group to avoid hung check triggering
|
|
# (to avoid terminating clickhouse-test itself, the signal should be ignored)
|
|
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
|
os.killpg(os.getpgid(os.getpid()), signal.SIGTERM)
|
|
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
|
|
|
|
|
def get_db_engine(args, database_name):
|
|
if args.replicated_database:
|
|
return f" ON CLUSTER test_cluster_database_replicated \
|
|
ENGINE=Replicated('/test/clickhouse/db/{database_name}', \
|
|
'{{shard}}', '{{replica}}')"
|
|
if args.db_engine:
|
|
return " ENGINE=" + args.db_engine
|
|
return "" # Will use default engine
|
|
|
|
|
|
def get_create_database_settings(args, testcase_args):
|
|
create_database_settings = dict()
|
|
if testcase_args:
|
|
create_database_settings["log_comment"] = testcase_args.testcase_basename
|
|
if args.db_engine == "Ordinary":
|
|
create_database_settings["allow_deprecated_database_ordinary"] = 1
|
|
return create_database_settings
|
|
|
|
|
|
def get_zookeeper_session_uptime(args):
|
|
try:
|
|
if args.replicated_database:
|
|
return int(
|
|
clickhouse_execute(
|
|
args,
|
|
"""
|
|
SELECT min(materialize(zookeeperSessionUptime()))
|
|
FROM clusterAllReplicas('test_cluster_database_replicated', system.one)
|
|
""",
|
|
)
|
|
)
|
|
else:
|
|
return int(clickhouse_execute(args, "SELECT zookeeperSessionUptime()"))
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def need_retry(args, stdout, stderr, total_time):
|
|
if args.check_zookeeper_session:
|
|
# Sometimes we may get unexpected exception like "Replica is readonly" or "Shutdown is called for table"
|
|
# instead of "Session expired" or "Connection loss"
|
|
# Retry if session was expired during test execution.
|
|
# If ZooKeeper is configured, then it's more reliable than checking stderr,
|
|
# but the following condition is always true if ZooKeeper is not configured.
|
|
session_uptime = get_zookeeper_session_uptime(args)
|
|
if session_uptime is not None and session_uptime < math.ceil(total_time):
|
|
return True
|
|
return any(msg in stdout for msg in MESSAGES_TO_RETRY) or any(
|
|
msg in stderr for msg in MESSAGES_TO_RETRY
|
|
)
|
|
|
|
|
|
def get_processlist_with_stacktraces(args):
|
|
try:
|
|
if args.replicated_database:
|
|
return clickhouse_execute_json(
|
|
args,
|
|
"""
|
|
SELECT materialize(hostName() || '::' || tcpPort()::String) as host_port, *
|
|
-- NOTE: view() here to do JOIN on shards, instead of initiator
|
|
FROM clusterAllReplicas('test_cluster_database_replicated', view(
|
|
SELECT
|
|
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
|
|
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
|
|
s.trace), '\n') AS stacktrace
|
|
)) AS stacktraces,
|
|
p.*
|
|
FROM system.processes p
|
|
JOIN system.stack_trace s USING (query_id)
|
|
WHERE query NOT LIKE '%system.processes%'
|
|
GROUP BY p.*
|
|
))
|
|
ORDER BY elapsed DESC
|
|
""",
|
|
settings={
|
|
"allow_introspection_functions": 1,
|
|
},
|
|
)
|
|
else:
|
|
return clickhouse_execute_json(
|
|
args,
|
|
"""
|
|
SELECT
|
|
arrayStringConcat(groupArray('Thread ID ' || toString(s.thread_id) || '\n' || arrayStringConcat(arrayMap(
|
|
x -> concat(addressToLine(x), '::', demangle(addressToSymbol(x))),
|
|
s.trace), '\n') AS stacktrace
|
|
)) AS stacktraces,
|
|
p.*
|
|
FROM system.processes p
|
|
JOIN system.stack_trace s USING (query_id)
|
|
WHERE query NOT LIKE '%system.processes%'
|
|
GROUP BY p.*
|
|
ORDER BY elapsed DESC
|
|
""",
|
|
settings={
|
|
"allow_introspection_functions": 1,
|
|
},
|
|
)
|
|
except Exception as e:
|
|
return "Failed to get processlist: " + str(e)
|
|
|
|
|
|
def get_transactions_list(args):
|
|
try:
|
|
if args.replicated_database:
|
|
return clickhouse_execute_json(
|
|
args,
|
|
"SELECT materialize((hostName(), tcpPort())) as host, * FROM "
|
|
"clusterAllReplicas('test_cluster_database_replicated', system.transactions)",
|
|
)
|
|
else:
|
|
return clickhouse_execute_json(args, "select * from system.transactions")
|
|
except Exception as e:
|
|
return f"Cannot get list of transactions: {e}"
|
|
|
|
|
|
# collect server stacktraces using gdb
|
|
def get_stacktraces_from_gdb(server_pid):
|
|
try:
|
|
cmd = f"gdb -batch -ex 'thread apply all backtrace' -p {server_pid}"
|
|
return subprocess.check_output(cmd, shell=True).decode("utf-8")
|
|
except Exception as e:
|
|
print(f"Error occurred while receiving stack traces from gdb: {e}")
|
|
return None
|
|
|
|
|
|
# collect server stacktraces from system.stack_trace table
|
|
# it does not work in Sandbox
|
|
def get_stacktraces_from_clickhouse(args):
|
|
settings_str = " ".join(
|
|
[
|
|
get_additional_client_options(args),
|
|
"--allow_introspection_functions=1",
|
|
"--skip_unavailable_shards=1",
|
|
]
|
|
)
|
|
replicated_msg = (
|
|
f"{args.client} {settings_str} --query "
|
|
'"SELECT materialize((hostName(), tcpPort())) as host, thread_id, '
|
|
"arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), "
|
|
"arrayMap(x -> addressToLine(x), trace), "
|
|
"arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace "
|
|
"FROM clusterAllReplicas('test_cluster_database_replicated', 'system.stack_trace') "
|
|
'ORDER BY host, thread_id FORMAT Vertical"'
|
|
)
|
|
|
|
msg = (
|
|
f"{args.client} {settings_str} --query "
|
|
"\"SELECT arrayStringConcat(arrayMap(x, y -> concat(x, ': ', y), "
|
|
"arrayMap(x -> addressToLine(x), trace), "
|
|
"arrayMap(x -> demangle(addressToSymbol(x)), trace)), '\n') as trace "
|
|
'FROM system.stack_trace FORMAT Vertical"'
|
|
)
|
|
|
|
try:
|
|
return subprocess.check_output(
|
|
replicated_msg if args.replicated_database else msg,
|
|
shell=True,
|
|
stderr=subprocess.STDOUT,
|
|
).decode("utf-8")
|
|
except Exception as e:
|
|
print(f"Error occurred while receiving stack traces from client: {e}")
|
|
return None
|
|
|
|
|
|
def print_stacktraces() -> None:
|
|
server_pid = get_server_pid()
|
|
|
|
bt = None
|
|
|
|
if server_pid and not args.replicated_database:
|
|
print("")
|
|
print(
|
|
f"Located ClickHouse server process {server_pid} listening at TCP port {args.tcp_port}"
|
|
)
|
|
print("Collecting stacktraces from all running threads with gdb:")
|
|
|
|
bt = get_stacktraces_from_gdb(server_pid)
|
|
|
|
if len(bt) < 1000:
|
|
print("Got suspiciously small stacktraces: ", bt)
|
|
bt = None
|
|
|
|
if bt is None:
|
|
print("\nCollecting stacktraces from system.stacktraces table:")
|
|
|
|
bt = get_stacktraces_from_clickhouse(args)
|
|
|
|
if bt is not None:
|
|
print(bt)
|
|
return
|
|
|
|
print(
|
|
colored(
|
|
f"\nUnable to locate ClickHouse server process listening at TCP port "
|
|
f"{args.tcp_port}. It must have crashed or exited prematurely!",
|
|
args,
|
|
"red",
|
|
attrs=["bold"],
|
|
)
|
|
)
|
|
|
|
|
|
def get_server_pid():
|
|
# lsof does not work in stress tests for some reason
|
|
cmd_lsof = f"lsof -i tcp:{args.tcp_port} -s tcp:LISTEN -Fp | sed 's/^p//p;d'"
|
|
cmd_pidof = "pidof -s clickhouse-server"
|
|
|
|
commands = [cmd_lsof, cmd_pidof]
|
|
output = None
|
|
|
|
for cmd in commands:
|
|
try:
|
|
output = subprocess.check_output(
|
|
cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True
|
|
)
|
|
if output:
|
|
return int(output)
|
|
except Exception as e:
|
|
print(f"Cannot get server pid with {cmd}, got {output}: {e}")
|
|
|
|
return None # most likely server is dead
|
|
|
|
|
|
def colored(text, args, color=None, on_color=None, attrs=None):
|
|
if termcolor and (sys.stdout.isatty() or args.force_color):
|
|
return termcolor.colored(text, color, on_color, attrs)
|
|
else:
|
|
return text
|
|
|
|
|
|
class TestStatus(enum.Enum):
|
|
FAIL = "FAIL"
|
|
UNKNOWN = "UNKNOWN"
|
|
OK = "OK"
|
|
SKIPPED = "SKIPPED"
|
|
|
|
|
|
class FailureReason(enum.Enum):
|
|
# FAIL reasons
|
|
TIMEOUT = "Timeout!"
|
|
SERVER_DIED = "server died"
|
|
EXIT_CODE = "return code: "
|
|
STDERR = "having stderror: "
|
|
EXCEPTION = "having having exception in stdout: "
|
|
RESULT_DIFF = "result differs with reference: "
|
|
TOO_LONG = "Test runs too long (> 60s). Make it faster."
|
|
INTERNAL_QUERY_FAIL = "Internal query (CREATE/DROP DATABASE) failed:"
|
|
|
|
# SKIPPED reasons
|
|
DISABLED = "disabled"
|
|
SKIP = "skip"
|
|
NO_JINJA = "no jinja"
|
|
NO_ZOOKEEPER = "no zookeeper"
|
|
NO_SHARD = "no shard"
|
|
FAST_ONLY = "running fast tests only"
|
|
NO_LONG = "not running long tests"
|
|
REPLICATED_DB = "replicated-database"
|
|
S3_STORAGE = "s3-storage"
|
|
BUILD = "not running for current build"
|
|
NO_UPGRADE_CHECK = "not running for upgrade check"
|
|
NO_PARALLEL_REPLICAS = "smth in not supported with parallel replicas"
|
|
|
|
# UNKNOWN reasons
|
|
NO_REFERENCE = "no reference file"
|
|
INTERNAL_ERROR = "Test internal error: "
|
|
|
|
|
|
def threshold_generator(always_on_prob, always_off_prob, min_val, max_val):
|
|
def gen():
|
|
tmp = random.random()
|
|
if tmp <= always_on_prob:
|
|
return min_val
|
|
if tmp <= always_on_prob + always_off_prob:
|
|
return max_val
|
|
|
|
if isinstance(min_val, int) and isinstance(max_val, int):
|
|
return random.randint(min_val, max_val)
|
|
else:
|
|
return random.uniform(min_val, max_val)
|
|
|
|
return gen
|
|
|
|
|
|
class SettingsRandomizer:
|
|
settings = {
|
|
"max_insert_threads": lambda: 0
|
|
if random.random() < 0.5
|
|
else random.randint(1, 16),
|
|
"group_by_two_level_threshold": threshold_generator(0.2, 0.2, 1, 1000000),
|
|
"group_by_two_level_threshold_bytes": threshold_generator(
|
|
0.2, 0.2, 1, 50000000
|
|
),
|
|
"distributed_aggregation_memory_efficient": lambda: random.randint(0, 1),
|
|
"fsync_metadata": lambda: random.randint(0, 1),
|
|
"output_format_parallel_formatting": lambda: random.randint(0, 1),
|
|
"input_format_parallel_parsing": lambda: random.randint(0, 1),
|
|
"min_chunk_bytes_for_parallel_parsing": lambda: max(
|
|
1024, int(random.gauss(10 * 1024 * 1024, 5 * 1000 * 1000))
|
|
),
|
|
"max_read_buffer_size": lambda: random.randint(500000, 1048576),
|
|
"prefer_localhost_replica": lambda: random.randint(0, 1),
|
|
"max_block_size": lambda: random.randint(8000, 100000),
|
|
"max_threads": lambda: random.randint(1, 64),
|
|
"optimize_or_like_chain": lambda: random.randint(0, 1),
|
|
"optimize_read_in_order": lambda: random.randint(0, 1),
|
|
"read_in_order_two_level_merge_threshold": lambda: random.randint(0, 100),
|
|
"optimize_aggregation_in_order": lambda: random.randint(0, 1),
|
|
"aggregation_in_order_max_block_bytes": lambda: random.randint(0, 50000000),
|
|
"min_compress_block_size": lambda: random.randint(1, 1048576 * 3),
|
|
"max_compress_block_size": lambda: random.randint(1, 1048576 * 3),
|
|
"use_uncompressed_cache": lambda: random.randint(0, 1),
|
|
"min_bytes_to_use_direct_io": threshold_generator(
|
|
0.2, 0.5, 1, 10 * 1024 * 1024 * 1024
|
|
),
|
|
"min_bytes_to_use_mmap_io": threshold_generator(
|
|
0.2, 0.5, 1, 10 * 1024 * 1024 * 1024
|
|
),
|
|
"local_filesystem_read_method": lambda: random.choice(
|
|
["read", "pread", "mmap", "pread_threadpool", "io_uring"]
|
|
),
|
|
"remote_filesystem_read_method": lambda: random.choice(["read", "threadpool"]),
|
|
"local_filesystem_read_prefetch": lambda: random.randint(0, 1),
|
|
"remote_filesystem_read_prefetch": lambda: random.randint(0, 1),
|
|
"compile_expressions": lambda: random.randint(0, 1),
|
|
"compile_aggregate_expressions": lambda: random.randint(0, 1),
|
|
"compile_sort_description": lambda: random.randint(0, 1),
|
|
"merge_tree_coarse_index_granularity": lambda: random.randint(2, 32),
|
|
"optimize_distinct_in_order": lambda: random.randint(0, 1),
|
|
"optimize_sorting_by_input_stream_properties": lambda: random.randint(0, 1),
|
|
"enable_memory_bound_merging_of_aggregation_results": lambda: random.randint(
|
|
0, 1
|
|
),
|
|
}
|
|
|
|
@staticmethod
|
|
def get_random_settings():
|
|
random_settings = []
|
|
for setting, generator in SettingsRandomizer.settings.items():
|
|
random_settings.append(f"{setting}={generator()}")
|
|
return random_settings
|
|
|
|
|
|
class MergeTreeSettingsRandomizer:
|
|
settings = {
|
|
"ratio_of_defaults_for_sparse_serialization": threshold_generator(
|
|
0.3, 0.5, 0.0, 1.0
|
|
),
|
|
"prefer_fetch_merged_part_size_threshold": threshold_generator(
|
|
0.2, 0.5, 1, 10 * 1024 * 1024 * 1024
|
|
),
|
|
"vertical_merge_algorithm_min_rows_to_activate": threshold_generator(
|
|
0.4, 0.4, 1, 1000000
|
|
),
|
|
"vertical_merge_algorithm_min_columns_to_activate": threshold_generator(
|
|
0.4, 0.4, 1, 100
|
|
),
|
|
"min_merge_bytes_to_use_direct_io": threshold_generator(
|
|
0.25, 0.25, 1, 10 * 1024 * 1024 * 1024
|
|
),
|
|
"index_granularity_bytes": lambda: random.randint(1024, 30 * 1024 * 1024),
|
|
"merge_max_block_size": lambda: random.randint(1, 8192 * 3),
|
|
"index_granularity": lambda: random.randint(1, 65536),
|
|
"min_bytes_for_wide_part": threshold_generator(0.3, 0.3, 0, 1024 * 1024 * 1024),
|
|
}
|
|
|
|
@staticmethod
|
|
def get_random_settings(args):
|
|
random_settings = []
|
|
for setting, generator in MergeTreeSettingsRandomizer.settings.items():
|
|
if setting not in args.changed_merge_tree_settings:
|
|
random_settings.append(f"{setting}={generator()}")
|
|
return random_settings
|
|
|
|
|
|
class TestResult:
|
|
def __init__(
|
|
self,
|
|
case_name: str,
|
|
status: TestStatus,
|
|
reason: Optional[FailureReason],
|
|
total_time: float,
|
|
description: str,
|
|
):
|
|
self.case_name: str = case_name
|
|
self.status: TestStatus = status
|
|
self.reason: Optional[FailureReason] = reason
|
|
self.total_time: float = total_time
|
|
self.description: str = description
|
|
self.need_retry: bool = False
|
|
|
|
def check_if_need_retry(self, args, stdout, stderr, runs_count):
|
|
if (
|
|
self.status != TestStatus.FAIL
|
|
or not need_retry(args, stdout, stderr, self.total_time)
|
|
or MAX_RETRIES < runs_count
|
|
):
|
|
return
|
|
self.need_retry = True
|
|
|
|
|
|
class TestCase:
|
|
@staticmethod
|
|
def get_description_from_exception_info(exc_info):
|
|
exc_type, exc_value, tb = exc_info
|
|
exc_name = exc_type.__name__
|
|
traceback_str = "\n".join(traceback.format_tb(tb, 10))
|
|
description = f"\n{exc_name}\n{exc_value}\n{traceback_str}"
|
|
return description
|
|
|
|
@staticmethod
|
|
def get_reference_file(suite_dir, name):
|
|
"""
|
|
Returns reference file name for specified test
|
|
"""
|
|
|
|
name = removesuffix(name, ".gen")
|
|
for ext in [".reference", ".gen.reference"]:
|
|
reference_file = os.path.join(suite_dir, name) + ext
|
|
if os.path.isfile(reference_file):
|
|
return reference_file
|
|
return None
|
|
|
|
@staticmethod
|
|
def configure_testcase_args(args, case_file, suite_tmp_dir):
|
|
testcase_args = copy.deepcopy(args)
|
|
|
|
testcase_args.testcase_start_time = datetime.now()
|
|
testcase_basename = os.path.basename(case_file)
|
|
testcase_args.testcase_client = (
|
|
f"{testcase_args.client} --log_comment '{testcase_basename}'"
|
|
)
|
|
testcase_args.testcase_basename = testcase_basename
|
|
|
|
if testcase_args.database:
|
|
database = testcase_args.database
|
|
os.environ.setdefault("CLICKHOUSE_DATABASE", database)
|
|
os.environ.setdefault("CLICKHOUSE_TMP", suite_tmp_dir)
|
|
testcase_args.test_tmp_dir = suite_tmp_dir
|
|
else:
|
|
# If --database is not specified, we will create temporary database with
|
|
# unique name and we will recreate and drop it for each test
|
|
def random_str(length=8):
|
|
alphabet = string.ascii_lowercase + string.digits
|
|
# NOTE: it is important not to use default random generator, since it shares state.
|
|
return "".join(
|
|
random.SystemRandom().choice(alphabet) for _ in range(length)
|
|
)
|
|
|
|
database = f"test_{random_str()}"
|
|
|
|
clickhouse_execute(
|
|
args,
|
|
"CREATE DATABASE IF NOT EXISTS " + database + get_db_engine(testcase_args, database),
|
|
settings=get_create_database_settings(args, testcase_args),
|
|
)
|
|
|
|
os.environ["CLICKHOUSE_DATABASE"] = database
|
|
# Set temporary directory to match the randomly generated database,
|
|
# because .sh tests also use it for temporary files and we want to avoid
|
|
# collisions.
|
|
testcase_args.test_tmp_dir = os.path.join(suite_tmp_dir, database)
|
|
os.mkdir(testcase_args.test_tmp_dir)
|
|
os.environ["CLICKHOUSE_TMP"] = testcase_args.test_tmp_dir
|
|
|
|
testcase_args.testcase_database = database
|
|
|
|
# Printed only in case of failures
|
|
#
|
|
# NOTE: here we use "CLICKHOUSE_TMP" instead of "file_suffix",
|
|
# so it is installed in configure_testcase_args() unlike other files
|
|
# (stdout_file, stderr_file) in TestCase::__init__().
|
|
# Since using CLICKHOUSE_TMP is easier to use in expect.
|
|
testcase_args.debug_log_file = (
|
|
os.path.join(testcase_args.test_tmp_dir, testcase_basename) + ".debuglog"
|
|
)
|
|
|
|
return testcase_args
|
|
|
|
@staticmethod
|
|
def cli_format_settings(settings_list) -> str:
|
|
return " ".join([f"--{setting}" for setting in settings_list])
|
|
|
|
def has_show_create_table_in_test(self):
|
|
return not subprocess.call(["grep", "-iq", "show create", self.case_file])
|
|
|
|
def add_random_settings(self, client_options):
|
|
new_options = ""
|
|
if self.randomize_settings:
|
|
if len(self.base_url_params) == 0:
|
|
os.environ["CLICKHOUSE_URL_PARAMS"] = "&".join(self.random_settings)
|
|
else:
|
|
os.environ["CLICKHOUSE_URL_PARAMS"] = (
|
|
self.base_url_params + "&" + "&".join(self.random_settings)
|
|
)
|
|
|
|
new_options += f" {self.cli_format_settings(self.random_settings)}"
|
|
|
|
if self.randomize_merge_tree_settings:
|
|
new_options += f" --allow_merge_tree_settings {self.cli_format_settings(self.merge_tree_random_settings)}"
|
|
|
|
if new_options != "":
|
|
new_options += " --allow_repeated_settings"
|
|
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"] = (
|
|
self.base_client_options + new_options + " "
|
|
)
|
|
|
|
return client_options + new_options
|
|
|
|
def remove_random_settings_from_env(self):
|
|
os.environ["CLICKHOUSE_URL_PARAMS"] = self.base_url_params
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"] = self.base_client_options
|
|
|
|
def add_info_about_settings(self, description):
|
|
if self.randomize_settings:
|
|
description += f"\nSettings used in the test: {self.cli_format_settings(self.random_settings)}"
|
|
if self.randomize_merge_tree_settings:
|
|
description += f"\n\nMergeTree settings used in test: {self.cli_format_settings(self.merge_tree_random_settings)}"
|
|
|
|
return description + "\n"
|
|
|
|
def __init__(self, suite, case: str, args, is_concurrent: bool):
|
|
self.case: str = case # case file name
|
|
self.tags: Set[str] = suite.all_tags[case] if case in suite.all_tags else set()
|
|
|
|
for tag in os.getenv("GLOBAL_TAGS", "").split(","):
|
|
self.tags.add(tag.strip())
|
|
|
|
self.case_file: str = os.path.join(suite.suite_path, case)
|
|
(self.name, self.ext) = os.path.splitext(case)
|
|
|
|
file_suffix = f".{os.getpid()}" if is_concurrent and args.test_runs > 1 else ""
|
|
self.reference_file = self.get_reference_file(suite.suite_path, self.name)
|
|
self.stdout_file = (
|
|
os.path.join(suite.suite_tmp_path, self.name) + file_suffix + ".stdout"
|
|
)
|
|
self.stderr_file = (
|
|
os.path.join(suite.suite_tmp_path, self.name) + file_suffix + ".stderr"
|
|
)
|
|
|
|
self.testcase_args = None
|
|
self.runs_count = 0
|
|
|
|
has_no_random_settings_tag = self.tags and "no-random-settings" in self.tags
|
|
|
|
self.randomize_settings = not (
|
|
args.no_random_settings or has_no_random_settings_tag
|
|
)
|
|
|
|
has_no_random_merge_tree_settings_tag = (
|
|
self.tags and "no-random-merge-tree-settings" in self.tags
|
|
)
|
|
|
|
# If test contains SHOW CREATE TABLE do not
|
|
# randomize merge tree settings, because
|
|
# they will be added to table definition and test will fail
|
|
self.randomize_merge_tree_settings = not (
|
|
args.no_random_merge_tree_settings
|
|
or has_no_random_settings_tag
|
|
or has_no_random_merge_tree_settings_tag
|
|
or self.has_show_create_table_in_test()
|
|
)
|
|
|
|
if self.randomize_settings:
|
|
self.random_settings = SettingsRandomizer.get_random_settings()
|
|
|
|
if self.randomize_merge_tree_settings:
|
|
self.merge_tree_random_settings = (
|
|
MergeTreeSettingsRandomizer.get_random_settings(args)
|
|
)
|
|
|
|
self.base_url_params = (
|
|
os.environ["CLICKHOUSE_URL_PARAMS"]
|
|
if "CLICKHOUSE_URL_PARAMS" in os.environ
|
|
else ""
|
|
)
|
|
|
|
self.base_client_options = (
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"]
|
|
if "CLICKHOUSE_CLIENT_OPT" in os.environ
|
|
else ""
|
|
)
|
|
|
|
# should skip test, should increment skipped_total, skip reason
|
|
def should_skip_test(self, suite) -> Optional[FailureReason]:
|
|
tags = self.tags
|
|
|
|
if tags and ("disabled" in tags) and not args.disabled:
|
|
return FailureReason.DISABLED
|
|
|
|
elif (
|
|
os.path.exists(os.path.join(suite.suite_path, self.name) + ".disabled")
|
|
and not args.disabled
|
|
):
|
|
return FailureReason.DISABLED
|
|
|
|
elif "no-parallel-replicas" in tags and args.no_parallel_replicas:
|
|
return FailureReason.NO_PARALLEL_REPLICAS
|
|
|
|
elif args.skip and any(s in self.name for s in args.skip):
|
|
return FailureReason.SKIP
|
|
|
|
elif not USE_JINJA and self.ext.endswith("j2"):
|
|
return FailureReason.NO_JINJA
|
|
|
|
elif (
|
|
tags
|
|
and (("zookeeper" in tags) or ("replica" in tags))
|
|
and not args.zookeeper
|
|
):
|
|
return FailureReason.NO_ZOOKEEPER
|
|
|
|
elif (
|
|
tags
|
|
and (("shard" in tags) or ("distributed" in tags) or ("global" in tags))
|
|
and not args.shard
|
|
):
|
|
return FailureReason.NO_SHARD
|
|
|
|
elif tags and ("no-fasttest" in tags) and args.fast_tests_only:
|
|
return FailureReason.FAST_ONLY
|
|
|
|
elif (
|
|
tags
|
|
and (("long" in tags) or ("deadlock" in tags) or ("race" in tags))
|
|
and args.no_long
|
|
):
|
|
# Tests for races and deadlocks usually are run in a loop for a significant amount of time
|
|
return FailureReason.NO_LONG
|
|
|
|
elif tags and ("no-replicated-database" in tags) and args.replicated_database:
|
|
return FailureReason.REPLICATED_DB
|
|
|
|
# TODO: remove checking "no-upgrade-check" after 23.1
|
|
elif args.upgrade_check and (
|
|
"no-upgrade-check" in tags or "no-upgrade-check" in tags):
|
|
return FailureReason.NO_UPGRADE_CHECK
|
|
|
|
elif tags and ("no-s3-storage" in tags) and args.s3_storage:
|
|
return FailureReason.S3_STORAGE
|
|
|
|
elif tags:
|
|
for build_flag in args.build_flags:
|
|
if "no-" + build_flag in tags:
|
|
return FailureReason.BUILD
|
|
for tag in tags:
|
|
tag = tag.replace("-", "_")
|
|
if tag.startswith("use_") and tag not in args.build_flags:
|
|
return FailureReason.BUILD
|
|
|
|
return None
|
|
|
|
def process_result_impl(
|
|
self, proc, stdout: str, stderr: str, debug_log: str, total_time: float
|
|
):
|
|
description = ""
|
|
|
|
debug_log = trim_for_log(debug_log)
|
|
|
|
if proc:
|
|
if proc.returncode is None:
|
|
try:
|
|
proc.kill()
|
|
except OSError as e:
|
|
if e.errno != ESRCH:
|
|
raise
|
|
|
|
if stderr:
|
|
description += stderr
|
|
if debug_log:
|
|
description += "\n"
|
|
description += debug_log
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.TIMEOUT,
|
|
total_time,
|
|
description,
|
|
)
|
|
|
|
if proc.returncode != 0:
|
|
reason = FailureReason.EXIT_CODE
|
|
description += str(proc.returncode)
|
|
|
|
if stderr:
|
|
description += "\n"
|
|
description += stderr
|
|
if debug_log:
|
|
description += "\n"
|
|
description += debug_log
|
|
|
|
# Stop on fatal errors like segmentation fault. They are sent to client via logs.
|
|
if " <Fatal> " in stderr:
|
|
reason = FailureReason.SERVER_DIED
|
|
|
|
if (
|
|
self.testcase_args.stop
|
|
and (
|
|
"Connection refused" in stderr
|
|
or "Attempt to read after eof" in stderr
|
|
)
|
|
and "Received exception from server" not in stderr
|
|
):
|
|
reason = FailureReason.SERVER_DIED
|
|
|
|
if os.path.isfile(self.stdout_file):
|
|
description += ", result:\n\n"
|
|
description += trim_for_log(open(self.stdout_file).read())
|
|
description += "\n"
|
|
|
|
description += f"\nstdout:\n{stdout}\n"
|
|
return TestResult(
|
|
self.name, TestStatus.FAIL, reason, total_time, description
|
|
)
|
|
|
|
if stderr:
|
|
description += "\n"
|
|
description += trim_for_log(stderr)
|
|
description += "\n"
|
|
description += "\nstdout:\n"
|
|
description += trim_for_log(stdout)
|
|
description += "\n"
|
|
if debug_log:
|
|
description += "\n"
|
|
description += debug_log
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.STDERR,
|
|
total_time,
|
|
description,
|
|
)
|
|
|
|
if "Exception" in stdout:
|
|
description += "\n"
|
|
description += trim_for_log(stdout)
|
|
description += "\n"
|
|
if debug_log:
|
|
description += "\n"
|
|
description += debug_log
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.EXCEPTION,
|
|
total_time,
|
|
description,
|
|
)
|
|
|
|
if "@@SKIP@@" in stdout:
|
|
skip_reason = stdout.replace("@@SKIP@@", "").rstrip("\n")
|
|
description += " - "
|
|
description += skip_reason
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.SKIPPED,
|
|
FailureReason.SKIP,
|
|
total_time,
|
|
description,
|
|
)
|
|
|
|
if self.reference_file is None:
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.UNKNOWN,
|
|
FailureReason.NO_REFERENCE,
|
|
total_time,
|
|
description,
|
|
)
|
|
|
|
result_is_different = subprocess.call(
|
|
["diff", "-q", self.reference_file, self.stdout_file], stdout=PIPE
|
|
)
|
|
|
|
if result_is_different:
|
|
diff = Popen(
|
|
[
|
|
"diff",
|
|
"-U",
|
|
str(self.testcase_args.unified),
|
|
self.reference_file,
|
|
self.stdout_file,
|
|
],
|
|
stdout=PIPE,
|
|
universal_newlines=True,
|
|
).communicate()[0]
|
|
if diff.startswith("Binary files "):
|
|
diff += "Content of stdout:\n===================\n"
|
|
file = open(self.stdout_file, "r")
|
|
diff += str(file.read())
|
|
file.close()
|
|
diff += "==================="
|
|
description += f"\n{diff}\n"
|
|
if debug_log:
|
|
description += "\n"
|
|
description += debug_log
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.RESULT_DIFF,
|
|
total_time,
|
|
description,
|
|
)
|
|
|
|
if (
|
|
self.testcase_args.test_runs > 1
|
|
and total_time > 120
|
|
and "long" not in self.tags
|
|
):
|
|
if debug_log:
|
|
description += "\n"
|
|
description += debug_log
|
|
# We're in Flaky Check mode, check the run time as well while we're at it.
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.TOO_LONG,
|
|
total_time,
|
|
description,
|
|
)
|
|
|
|
if os.path.exists(self.stdout_file):
|
|
os.remove(self.stdout_file)
|
|
if os.path.exists(self.stderr_file):
|
|
os.remove(self.stderr_file)
|
|
if os.path.exists(self.testcase_args.debug_log_file):
|
|
os.remove(self.testcase_args.debug_log_file)
|
|
|
|
return TestResult(self.name, TestStatus.OK, None, total_time, description)
|
|
|
|
@staticmethod
|
|
def print_test_time(test_time) -> str:
|
|
if args.print_time:
|
|
return f" {test_time:.2f} sec."
|
|
else:
|
|
return ""
|
|
|
|
def process_result(self, result: TestResult, messages):
|
|
description_full = messages[result.status]
|
|
description_full += self.print_test_time(result.total_time)
|
|
if result.reason is not None:
|
|
description_full += " - "
|
|
description_full += result.reason.value
|
|
|
|
description_full += result.description
|
|
description_full += "\n"
|
|
|
|
if result.status == TestStatus.FAIL and self.testcase_args:
|
|
description_full += "Database: " + self.testcase_args.testcase_database
|
|
|
|
result.description = description_full
|
|
return result
|
|
|
|
@staticmethod
|
|
def send_test_name_failed(suite: str, case: str):
|
|
pid = os.getpid()
|
|
clickhouse_execute(args, f"SELECT 'Running test {suite}/{case} from pid={pid}'", retry_error_codes=True)
|
|
|
|
def run_single_test(
|
|
self, server_logs_level, client_options
|
|
) -> Tuple[Optional[Popen], str, str, str, float]:
|
|
args = self.testcase_args
|
|
client = args.testcase_client
|
|
start_time = args.testcase_start_time
|
|
database = args.testcase_database
|
|
|
|
# This is for .sh tests
|
|
os.environ["CLICKHOUSE_LOG_COMMENT"] = args.testcase_basename
|
|
|
|
params = {
|
|
"client": client + " --database=" + database,
|
|
"logs_level": server_logs_level,
|
|
"options": client_options,
|
|
"test": self.case_file,
|
|
"stdout": self.stdout_file,
|
|
"stderr": self.stderr_file,
|
|
"secure": "--secure" if args.secure else "",
|
|
}
|
|
|
|
# >> append to stderr (but not stdout since it is not used there),
|
|
# because there are also output of per test database creation
|
|
if not args.database:
|
|
pattern = "{test} > {stdout} 2> {stderr}"
|
|
else:
|
|
pattern = "{test} > {stdout} 2> {stderr}"
|
|
|
|
if self.ext == ".sql":
|
|
pattern = (
|
|
"{client} --send_logs_level={logs_level} {secure} --multiquery {options} < "
|
|
+ pattern
|
|
)
|
|
|
|
command = pattern.format(**params)
|
|
|
|
proc = Popen(command, shell=True, env=os.environ)
|
|
|
|
while (
|
|
datetime.now() - start_time
|
|
).total_seconds() < args.timeout and proc.poll() is None:
|
|
sleep(0.01)
|
|
|
|
need_drop_database = not args.database
|
|
if need_drop_database and args.no_drop_if_fail:
|
|
maybe_passed = (
|
|
(proc.returncode == 0)
|
|
and (proc.stderr is None)
|
|
and (proc.stdout is None or "Exception" not in proc.stdout)
|
|
)
|
|
need_drop_database = maybe_passed
|
|
|
|
debug_log = ""
|
|
if os.path.exists(self.testcase_args.debug_log_file):
|
|
with open(self.testcase_args.debug_log_file, "rb") as stream:
|
|
debug_log += self.testcase_args.debug_log_file + ":\n"
|
|
debug_log += str(stream.read(), errors="replace", encoding="utf-8")
|
|
debug_log += "\n"
|
|
|
|
if need_drop_database:
|
|
seconds_left = max(
|
|
args.timeout - (datetime.now() - start_time).total_seconds(), 20
|
|
)
|
|
drop_database_query = "DROP DATABASE IF EXISTS " + database
|
|
if args.replicated_database:
|
|
drop_database_query += " ON CLUSTER test_cluster_database_replicated"
|
|
|
|
try:
|
|
# It's possible to get an error "New table appeared in database being dropped or detached. Try again."
|
|
for _ in range(1, 60):
|
|
try:
|
|
clickhouse_execute(
|
|
args,
|
|
drop_database_query,
|
|
timeout=seconds_left,
|
|
settings={
|
|
"log_comment": args.testcase_basename,
|
|
},
|
|
)
|
|
except HTTPError as e:
|
|
if need_retry(args, e.message, e.message, 0):
|
|
continue
|
|
raise
|
|
break
|
|
|
|
except socket.timeout:
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
return (
|
|
None,
|
|
"",
|
|
f"Timeout dropping database {database} after test",
|
|
debug_log,
|
|
total_time,
|
|
)
|
|
shutil.rmtree(args.test_tmp_dir)
|
|
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
|
|
# Normalize randomized database names in stdout, stderr files.
|
|
os.system(f"LC_ALL=C sed -i -e 's/{database}/default/g' {self.stdout_file}")
|
|
if args.hide_db_name:
|
|
os.system(f"LC_ALL=C sed -i -e 's/{database}/default/g' {self.stderr_file}")
|
|
if args.replicated_database:
|
|
os.system(f"LC_ALL=C sed -i -e 's|/auto_{{shard}}||g' {self.stdout_file}")
|
|
os.system(f"LC_ALL=C sed -i -e 's|auto_{{replica}}||g' {self.stdout_file}")
|
|
|
|
# Normalize hostname in stdout file.
|
|
os.system(
|
|
f"LC_ALL=C sed -i -e 's/{socket.gethostname()}/localhost/g' {self.stdout_file}"
|
|
)
|
|
|
|
stdout = ""
|
|
if os.path.exists(self.stdout_file):
|
|
with open(self.stdout_file, "rb") as stdfd:
|
|
stdout = str(stdfd.read(), errors="replace", encoding="utf-8")
|
|
|
|
stderr = ""
|
|
if os.path.exists(self.stderr_file):
|
|
with open(self.stderr_file, "rb") as stdfd:
|
|
stderr += str(stdfd.read(), errors="replace", encoding="utf-8")
|
|
|
|
return proc, stdout, stderr, debug_log, total_time
|
|
|
|
def run(self, args, suite, client_options, server_logs_level):
|
|
try:
|
|
skip_reason = self.should_skip_test(suite)
|
|
if skip_reason is not None:
|
|
return TestResult(self.name, TestStatus.SKIPPED, skip_reason, 0.0, "")
|
|
|
|
if args.testname:
|
|
try:
|
|
self.send_test_name_failed(suite.suite, self.case)
|
|
except Exception:
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.SERVER_DIED,
|
|
0.0,
|
|
"\nServer does not respond to health check\n",
|
|
)
|
|
|
|
self.runs_count += 1
|
|
self.testcase_args = self.configure_testcase_args(
|
|
args, self.case_file, suite.suite_tmp_path
|
|
)
|
|
client_options = self.add_random_settings(client_options)
|
|
proc, stdout, stderr, debug_log, total_time = self.run_single_test(
|
|
server_logs_level, client_options
|
|
)
|
|
|
|
result = self.process_result_impl(
|
|
proc, stdout, stderr, debug_log, total_time
|
|
)
|
|
result.check_if_need_retry(args, stdout, stderr, self.runs_count)
|
|
# to avoid breaking CSV parser
|
|
result.description = result.description.replace("\0", "")
|
|
|
|
if result.status == TestStatus.FAIL:
|
|
result.description = self.add_info_about_settings(result.description)
|
|
return result
|
|
except KeyboardInterrupt as e:
|
|
raise e
|
|
except HTTPError:
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.INTERNAL_QUERY_FAIL,
|
|
0.0,
|
|
self.add_info_about_settings(
|
|
self.get_description_from_exception_info(sys.exc_info())
|
|
),
|
|
)
|
|
except (ConnectionError, http.client.ImproperConnectionState):
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.FAIL,
|
|
FailureReason.SERVER_DIED,
|
|
0.0,
|
|
self.add_info_about_settings(
|
|
self.get_description_from_exception_info(sys.exc_info())
|
|
),
|
|
)
|
|
except Exception:
|
|
return TestResult(
|
|
self.name,
|
|
TestStatus.UNKNOWN,
|
|
FailureReason.INTERNAL_ERROR,
|
|
0.0,
|
|
self.get_description_from_exception_info(sys.exc_info()),
|
|
)
|
|
finally:
|
|
self.remove_random_settings_from_env()
|
|
|
|
|
|
class TestSuite:
|
|
@staticmethod
|
|
def tests_in_suite_key_func(item: str) -> float:
|
|
if args.order == "random":
|
|
return random.random()
|
|
|
|
reverse = 1 if args.order == "asc" else -1
|
|
|
|
if -1 == item.find("_"):
|
|
return 99998
|
|
|
|
prefix, _ = item.split("_", 1)
|
|
|
|
try:
|
|
return reverse * int(prefix)
|
|
except ValueError:
|
|
return 99997
|
|
|
|
@staticmethod
|
|
def render_test_template(j2env, suite_dir, test_name):
|
|
"""
|
|
Render template for test and reference file if needed
|
|
"""
|
|
|
|
if j2env is None:
|
|
return test_name
|
|
|
|
test_base_name = removesuffix(test_name, ".sql.j2", ".sql")
|
|
|
|
reference_file_name = test_base_name + ".reference.j2"
|
|
reference_file_path = os.path.join(suite_dir, reference_file_name)
|
|
if os.path.isfile(reference_file_path):
|
|
tpl = j2env.get_template(reference_file_name)
|
|
tpl.stream().dump(
|
|
os.path.join(suite_dir, test_base_name) + ".gen.reference"
|
|
)
|
|
|
|
if test_name.endswith(".sql.j2"):
|
|
tpl = j2env.get_template(test_name)
|
|
generated_test_name = test_base_name + ".gen.sql"
|
|
tpl.stream().dump(os.path.join(suite_dir, generated_test_name))
|
|
return generated_test_name
|
|
|
|
return test_name
|
|
|
|
@staticmethod
|
|
def read_test_tags(suite_dir: str, all_tests: List[str]) -> Dict[str, Set[str]]:
|
|
def get_comment_sign(filename):
|
|
if filename.endswith(".sql") or filename.endswith(".sql.j2"):
|
|
return "--"
|
|
elif (
|
|
filename.endswith(".sh")
|
|
or filename.endswith(".py")
|
|
or filename.endswith(".expect")
|
|
):
|
|
return "#"
|
|
else:
|
|
raise Exception(f"Unknown file_extension: {filename}")
|
|
|
|
def parse_tags_from_line(line, comment_sign):
|
|
if not line.startswith(comment_sign):
|
|
return None
|
|
tags_str = line[len(comment_sign) :].lstrip() # noqa: ignore E203
|
|
tags_prefix = "Tags:"
|
|
if not tags_str.startswith(tags_prefix):
|
|
return None
|
|
tags_str = tags_str[len(tags_prefix) :] # noqa: ignore E203
|
|
tags = tags_str.split(",")
|
|
tags = {tag.strip() for tag in tags}
|
|
return tags
|
|
|
|
def is_shebang(line: str) -> bool:
|
|
return line.startswith("#!")
|
|
|
|
def find_tag_line(file):
|
|
for line in file:
|
|
line = line.strip()
|
|
if line and not is_shebang(line):
|
|
return line
|
|
return ""
|
|
|
|
def load_tags_from_file(filepath):
|
|
comment_sign = get_comment_sign(filepath)
|
|
with open(filepath, "r", encoding="utf-8") as file:
|
|
try:
|
|
line = find_tag_line(file)
|
|
except UnicodeDecodeError:
|
|
return []
|
|
return parse_tags_from_line(line, comment_sign)
|
|
|
|
all_tags = {}
|
|
start_time = datetime.now()
|
|
for test_name in all_tests:
|
|
tags = load_tags_from_file(os.path.join(suite_dir, test_name))
|
|
if tags:
|
|
all_tags[test_name] = tags
|
|
elapsed = (datetime.now() - start_time).total_seconds()
|
|
if elapsed > 1:
|
|
print(f"Tags for suite {suite_dir} read in {elapsed:.2f} seconds")
|
|
return all_tags
|
|
|
|
def __init__(self, args, suite_path: str, suite_tmp_path: str, suite: str):
|
|
self.args = args
|
|
self.suite_path: str = suite_path
|
|
self.suite_tmp_path: str = suite_tmp_path
|
|
self.suite: str = suite
|
|
|
|
filter_func = lambda x: True # noqa: ignore E731
|
|
|
|
if args.run_by_hash_num is not None and args.run_by_hash_total is not None:
|
|
if args.run_by_hash_num > args.run_by_hash_total:
|
|
raise Exception(
|
|
f"Incorrect run by hash, value {args.run_by_hash_num} bigger than total {args.run_by_hash_total}"
|
|
)
|
|
|
|
filter_func = (
|
|
lambda x: stringhash(x) % args.run_by_hash_total == args.run_by_hash_num
|
|
)
|
|
|
|
self.all_tests: List[str] = self.get_tests_list(
|
|
self.tests_in_suite_key_func, filter_func
|
|
)
|
|
self.all_tags: Dict[str, Set[str]] = self.read_test_tags(
|
|
self.suite_path, self.all_tests
|
|
)
|
|
|
|
self.sequential_tests = []
|
|
self.parallel_tests = []
|
|
for test_name in self.all_tests:
|
|
if self.is_sequential_test(test_name):
|
|
self.sequential_tests.append(test_name)
|
|
else:
|
|
self.parallel_tests.append(test_name)
|
|
|
|
def is_sequential_test(self, test_name):
|
|
if args.sequential:
|
|
if any(s in test_name for s in args.sequential):
|
|
return True
|
|
|
|
if test_name not in self.all_tags:
|
|
return False
|
|
|
|
return ("no-parallel" in self.all_tags[test_name]) or (
|
|
"sequential" in self.all_tags[test_name]
|
|
)
|
|
|
|
def get_tests_list(self, sort_key, filter_func):
|
|
"""
|
|
Return list of tests file names to run
|
|
"""
|
|
|
|
all_tests = list(self.get_selected_tests(filter_func))
|
|
all_tests = all_tests * self.args.test_runs
|
|
all_tests.sort(key=sort_key)
|
|
return all_tests
|
|
|
|
def get_selected_tests(self, filter_func):
|
|
"""
|
|
Find all files with tests, filter, render templates
|
|
"""
|
|
|
|
j2env = (
|
|
jinja2.Environment(
|
|
loader=jinja2.FileSystemLoader(self.suite_path),
|
|
keep_trailing_newline=True,
|
|
)
|
|
if USE_JINJA
|
|
else None
|
|
)
|
|
|
|
for test_name in os.listdir(self.suite_path):
|
|
if not is_test_from_dir(self.suite_path, test_name):
|
|
continue
|
|
if self.args.test and not any(
|
|
re.search(pattern, test_name) for pattern in self.args.test
|
|
):
|
|
continue
|
|
if USE_JINJA and test_name.endswith(".gen.sql"):
|
|
continue
|
|
if not filter_func(test_name):
|
|
continue
|
|
test_name = self.render_test_template(j2env, self.suite_path, test_name)
|
|
yield test_name
|
|
|
|
@staticmethod
|
|
def read_test_suite(args, suite_dir_name: str):
|
|
def is_data_present():
|
|
try:
|
|
return int(clickhouse_execute(args, "EXISTS TABLE test.hits"))
|
|
except Exception as e:
|
|
print(
|
|
"Cannot check if dataset is available, assuming it's not: ", str(e)
|
|
)
|
|
return False
|
|
|
|
base_dir = os.path.abspath(args.queries)
|
|
tmp_dir = os.path.abspath(args.tmp)
|
|
suite_path = os.path.join(base_dir, suite_dir_name)
|
|
|
|
suite_re_obj = re.search("^[0-9]+_(.*)$", suite_dir_name)
|
|
if not suite_re_obj: # skip .gitignore and so on
|
|
return None
|
|
|
|
suite_tmp_path = os.path.join(tmp_dir, suite_dir_name)
|
|
if not os.path.exists(suite_tmp_path):
|
|
os.makedirs(suite_tmp_path)
|
|
|
|
suite = suite_re_obj.group(1)
|
|
|
|
if not os.path.isdir(suite_path):
|
|
return None
|
|
|
|
if "stateful" in suite and not args.no_stateful and not is_data_present():
|
|
print("Won't run stateful tests because test data wasn't loaded.")
|
|
return None
|
|
if "stateless" in suite and args.no_stateless:
|
|
print("Won't run stateless tests because they were manually disabled.")
|
|
return None
|
|
if "stateful" in suite and args.no_stateful:
|
|
print("Won't run stateful tests because they were manually disabled.")
|
|
return None
|
|
|
|
return TestSuite(args, suite_path, suite_tmp_path, suite)
|
|
|
|
|
|
stop_time = None
|
|
exit_code = None
|
|
server_died = None
|
|
stop_tests_triggered_lock = None
|
|
stop_tests_triggered = None
|
|
queue = None
|
|
multiprocessing_manager = None
|
|
restarted_tests = None
|
|
|
|
|
|
def run_tests_array(all_tests_with_params: Tuple[List[str], int, TestSuite]):
|
|
all_tests, num_tests, test_suite = all_tests_with_params
|
|
global stop_time
|
|
global exit_code
|
|
global server_died
|
|
global restarted_tests
|
|
|
|
OP_SQUARE_BRACKET = colored("[", args, attrs=["bold"])
|
|
CL_SQUARE_BRACKET = colored("]", args, attrs=["bold"])
|
|
|
|
MSG_FAIL = (
|
|
OP_SQUARE_BRACKET
|
|
+ colored(" FAIL ", args, "red", attrs=["bold"])
|
|
+ CL_SQUARE_BRACKET
|
|
)
|
|
MSG_UNKNOWN = (
|
|
OP_SQUARE_BRACKET
|
|
+ colored(" UNKNOWN ", args, "yellow", attrs=["bold"])
|
|
+ CL_SQUARE_BRACKET
|
|
)
|
|
MSG_OK = (
|
|
OP_SQUARE_BRACKET
|
|
+ colored(" OK ", args, "green", attrs=["bold"])
|
|
+ CL_SQUARE_BRACKET
|
|
)
|
|
MSG_SKIPPED = (
|
|
OP_SQUARE_BRACKET
|
|
+ colored(" SKIPPED ", args, "cyan", attrs=["bold"])
|
|
+ CL_SQUARE_BRACKET
|
|
)
|
|
|
|
MESSAGES = {
|
|
TestStatus.FAIL: MSG_FAIL,
|
|
TestStatus.UNKNOWN: MSG_UNKNOWN,
|
|
TestStatus.OK: MSG_OK,
|
|
TestStatus.SKIPPED: MSG_SKIPPED,
|
|
}
|
|
|
|
passed_total = 0
|
|
skipped_total = 0
|
|
failures_total = 0
|
|
failures_chain = 0
|
|
start_time = datetime.now()
|
|
|
|
is_concurrent = multiprocessing.current_process().name != "MainProcess"
|
|
|
|
client_options = get_additional_client_options(args)
|
|
|
|
if num_tests > 0:
|
|
about = "about " if is_concurrent else ""
|
|
proc_name = multiprocessing.current_process().name
|
|
print(f"\nRunning {about}{num_tests} {test_suite.suite} tests ({proc_name}).\n")
|
|
|
|
while True:
|
|
if is_concurrent:
|
|
case = queue.get(timeout=args.timeout * 1.1)
|
|
if not case:
|
|
break
|
|
else:
|
|
if all_tests:
|
|
case = all_tests.pop(0)
|
|
else:
|
|
break
|
|
|
|
if server_died.is_set():
|
|
stop_tests()
|
|
break
|
|
|
|
if stop_time and time() > stop_time:
|
|
print("\nStop tests run because global time limit is exceeded.\n")
|
|
stop_tests()
|
|
break
|
|
|
|
test_case = TestCase(test_suite, case, args, is_concurrent)
|
|
|
|
try:
|
|
description = ""
|
|
test_cace_name = removesuffix(test_case.name, ".gen", ".sql") + ": "
|
|
if not is_concurrent:
|
|
sys.stdout.flush()
|
|
sys.stdout.write(f"{test_cace_name:72}")
|
|
# This flush is needed so you can see the test name of the long
|
|
# running test before it will finish. But don't do it in parallel
|
|
# mode, so that the lines don't mix.
|
|
sys.stdout.flush()
|
|
else:
|
|
description = f"{test_cace_name:72}"
|
|
|
|
while True:
|
|
test_result = test_case.run(
|
|
args, test_suite, client_options, server_logs_level
|
|
)
|
|
test_result = test_case.process_result(test_result, MESSAGES)
|
|
if not test_result.need_retry:
|
|
break
|
|
restarted_tests.append(test_result)
|
|
|
|
# First print the description, than invoke the check result logic
|
|
description += test_result.description
|
|
|
|
if description and not description.endswith("\n"):
|
|
description += "\n"
|
|
|
|
sys.stdout.write(description)
|
|
sys.stdout.flush()
|
|
|
|
if test_result.status == TestStatus.OK:
|
|
passed_total += 1
|
|
failures_chain = 0
|
|
elif test_result.status == TestStatus.FAIL:
|
|
failures_total += 1
|
|
failures_chain += 1
|
|
if test_result.reason == FailureReason.SERVER_DIED:
|
|
server_died.set()
|
|
stop_tests()
|
|
elif test_result.status == TestStatus.SKIPPED:
|
|
skipped_total += 1
|
|
|
|
except KeyboardInterrupt as e:
|
|
print(colored("Break tests execution", args, "red"))
|
|
stop_tests()
|
|
raise e
|
|
|
|
if failures_chain >= args.max_failures_chain:
|
|
stop_tests()
|
|
break
|
|
|
|
if failures_total > 0:
|
|
print(
|
|
colored(
|
|
f"\nHaving {failures_total} errors! {passed_total} tests passed."
|
|
f" {skipped_total} tests skipped."
|
|
f" {(datetime.now() - start_time).total_seconds():.2f} s elapsed"
|
|
f" ({multiprocessing.current_process().name}).",
|
|
args,
|
|
"red",
|
|
attrs=["bold"],
|
|
)
|
|
)
|
|
exit_code.value = 1
|
|
else:
|
|
print(
|
|
colored(
|
|
f"\n{passed_total} tests passed. {skipped_total} tests skipped."
|
|
f" {(datetime.now() - start_time).total_seconds():.2f} s elapsed"
|
|
f" ({multiprocessing.current_process().name}).",
|
|
args,
|
|
"green",
|
|
attrs=["bold"],
|
|
)
|
|
)
|
|
|
|
sys.stdout.flush()
|
|
|
|
|
|
server_logs_level = "warning"
|
|
|
|
|
|
def check_server_started(args):
|
|
print("Connecting to ClickHouse server...", end="")
|
|
|
|
sys.stdout.flush()
|
|
retry_count = args.server_check_retries
|
|
while retry_count > 0:
|
|
try:
|
|
clickhouse_execute(args, "SELECT 1", max_http_retries=1)
|
|
print(" OK")
|
|
sys.stdout.flush()
|
|
return True
|
|
except (ConnectionError, http.client.ImproperConnectionState) as e:
|
|
if args.hung_check:
|
|
print("Connection error, will retry: ", str(e))
|
|
else:
|
|
print(".", end="")
|
|
sys.stdout.flush()
|
|
retry_count -= 1
|
|
sleep(0.5)
|
|
continue
|
|
except TimeoutError:
|
|
print("\nConnection timeout, will not retry")
|
|
break
|
|
except Exception as e:
|
|
print(
|
|
"\nUexpected exception, will not retry: ",
|
|
type(e).__name__,
|
|
": ",
|
|
str(e),
|
|
)
|
|
break
|
|
|
|
print("\nAll connection tries failed")
|
|
sys.stdout.flush()
|
|
return False
|
|
|
|
|
|
class BuildFlags:
|
|
THREAD = "tsan"
|
|
ADDRESS = "asan"
|
|
UNDEFINED = "ubsan"
|
|
MEMORY = "msan"
|
|
DEBUG = "debug"
|
|
RELEASE = "release"
|
|
ORDINARY_DATABASE = "ordinary-database"
|
|
POLYMORPHIC_PARTS = "polymorphic-parts"
|
|
|
|
|
|
def collect_build_flags(args):
|
|
result = []
|
|
|
|
value = clickhouse_execute(
|
|
args, "SELECT value FROM system.build_options WHERE name = 'CXX_FLAGS'"
|
|
)
|
|
if b"-fsanitize=thread" in value:
|
|
result.append(BuildFlags.THREAD)
|
|
elif b"-fsanitize=address" in value:
|
|
result.append(BuildFlags.ADDRESS)
|
|
elif b"-fsanitize=undefined" in value:
|
|
result.append(BuildFlags.UNDEFINED)
|
|
elif b"-fsanitize=memory" in value:
|
|
result.append(BuildFlags.MEMORY)
|
|
|
|
value = clickhouse_execute(
|
|
args, "SELECT value FROM system.build_options WHERE name = 'BUILD_TYPE'"
|
|
)
|
|
if b"Debug" in value:
|
|
result.append(BuildFlags.DEBUG)
|
|
elif b"RelWithDebInfo" in value or b"Release" in value:
|
|
result.append(BuildFlags.RELEASE)
|
|
|
|
value = clickhouse_execute(
|
|
args,
|
|
"SELECT value FROM system.settings WHERE name = 'allow_deprecated_database_ordinary'",
|
|
)
|
|
if value == b"1" or args.db_engine == "Ordinary":
|
|
result.append(BuildFlags.ORDINARY_DATABASE)
|
|
|
|
value = int(
|
|
clickhouse_execute(
|
|
args,
|
|
"SELECT value FROM system.merge_tree_settings WHERE name = 'min_bytes_for_wide_part'",
|
|
)
|
|
)
|
|
if value == 0:
|
|
result.append(BuildFlags.POLYMORPHIC_PARTS)
|
|
|
|
use_flags = clickhouse_execute(
|
|
args,
|
|
"SELECT name FROM system.build_options WHERE name like 'USE_%' AND value in ('ON', '1')",
|
|
)
|
|
for use_flag in use_flags.strip().splitlines():
|
|
use_flag = use_flag.decode().lower()
|
|
result.append(use_flag)
|
|
|
|
system_processor = clickhouse_execute(
|
|
args,
|
|
"SELECT value FROM system.build_options WHERE name = 'SYSTEM_PROCESSOR' LIMIT 1",
|
|
).strip()
|
|
if system_processor:
|
|
result.append(f"cpu-{system_processor.decode().lower()}")
|
|
|
|
return result
|
|
|
|
|
|
def collect_changed_merge_tree_settings(args):
|
|
changed_settings = (
|
|
clickhouse_execute(
|
|
args,
|
|
"SELECT name FROM system.merge_tree_settings WHERE changed",
|
|
)
|
|
.strip()
|
|
.splitlines()
|
|
)
|
|
|
|
return list(map(lambda s: s.decode(), changed_settings))
|
|
|
|
|
|
def check_table_column(args, database, table, column):
|
|
return (
|
|
int(
|
|
clickhouse_execute(
|
|
args,
|
|
f"""
|
|
SELECT count()
|
|
FROM system.columns
|
|
WHERE database = '{database}' AND table = '{table}' AND name = '{column}'
|
|
""",
|
|
)
|
|
)
|
|
> 0
|
|
)
|
|
|
|
|
|
def suite_key_func(item: str) -> Union[float, Tuple[int, str]]:
|
|
if args.order == "random":
|
|
return random.random()
|
|
|
|
if -1 == item.find("_"):
|
|
return 99998, ""
|
|
|
|
prefix, suffix = item.split("_", 1)
|
|
|
|
try:
|
|
return int(prefix), suffix
|
|
except ValueError:
|
|
return 99997, ""
|
|
|
|
|
|
def extract_key(key: str) -> str:
|
|
return subprocess.getstatusoutput(
|
|
args.extract_from_config + " --try --config " + args.configserver + key
|
|
)[1]
|
|
|
|
|
|
def do_run_tests(jobs, test_suite: TestSuite, parallel):
|
|
if jobs > 1 and len(test_suite.parallel_tests) > 0:
|
|
print(
|
|
"Found",
|
|
len(test_suite.parallel_tests),
|
|
"parallel tests and",
|
|
len(test_suite.sequential_tests),
|
|
"sequential tests",
|
|
)
|
|
run_n, run_total = parallel.split("/")
|
|
run_n = float(run_n)
|
|
run_total = float(run_total)
|
|
tests_n = len(test_suite.parallel_tests)
|
|
run_total = min(run_total, tests_n)
|
|
|
|
jobs = min(jobs, tests_n)
|
|
run_total = max(jobs, run_total)
|
|
|
|
batch_size = max(1, len(test_suite.parallel_tests) // jobs)
|
|
parallel_tests_array = []
|
|
for _ in range(jobs):
|
|
parallel_tests_array.append((None, batch_size, test_suite))
|
|
|
|
try:
|
|
with closing(multiprocessing.Pool(processes=jobs)) as pool:
|
|
pool.map_async(run_tests_array, parallel_tests_array)
|
|
|
|
for suit in test_suite.parallel_tests:
|
|
queue.put(suit, timeout=args.timeout * 1.1)
|
|
|
|
for _ in range(jobs):
|
|
queue.put(None, timeout=args.timeout * 1.1)
|
|
|
|
queue.close()
|
|
except Full:
|
|
print(
|
|
"Couldn't put test to the queue within timeout. Server probably hung."
|
|
)
|
|
print_stacktraces()
|
|
queue.close()
|
|
|
|
pool.join()
|
|
|
|
run_tests_array(
|
|
(test_suite.sequential_tests, len(test_suite.sequential_tests), test_suite)
|
|
)
|
|
return len(test_suite.sequential_tests) + len(test_suite.parallel_tests)
|
|
else:
|
|
num_tests = len(test_suite.all_tests)
|
|
run_tests_array((test_suite.all_tests, num_tests, test_suite))
|
|
return num_tests
|
|
|
|
|
|
def is_test_from_dir(suite_dir, case):
|
|
case_file = os.path.join(suite_dir, case)
|
|
# We could also test for executable files (os.access(case_file, os.X_OK),
|
|
# but it interferes with 01610_client_spawn_editor.editor, which is invoked
|
|
# as a query editor in the test, and must be marked as executable.
|
|
return os.path.isfile(case_file) and any(
|
|
case_file.endswith(suppotred_ext) for suppotred_ext in TEST_FILE_EXTENSIONS
|
|
)
|
|
|
|
|
|
def removesuffix(text, *suffixes):
|
|
"""
|
|
Added in python 3.9
|
|
https://www.python.org/dev/peps/pep-0616/
|
|
|
|
This version can work with several possible suffixes
|
|
"""
|
|
for suffix in suffixes:
|
|
if suffix and text.endswith(suffix):
|
|
return text[: -len(suffix)]
|
|
return text
|
|
|
|
|
|
def reportCoverageFor(args, what, query, permissive=False):
|
|
value = clickhouse_execute(args, query).decode()
|
|
|
|
if value != "":
|
|
print(f"\nThe following {what} were not covered by tests:\n")
|
|
print(value)
|
|
print("\n")
|
|
return permissive
|
|
|
|
return True
|
|
|
|
|
|
def reportCoverage(args):
|
|
clickhouse_execute(args, "SYSTEM FLUSH LOGS")
|
|
|
|
return (
|
|
reportCoverageFor(
|
|
args,
|
|
"functions",
|
|
"""
|
|
SELECT name
|
|
FROM system.functions
|
|
WHERE NOT is_aggregate AND origin = 'System' AND alias_to = ''
|
|
AND name NOT IN
|
|
(
|
|
SELECT arrayJoin(used_functions) FROM system.query_log WHERE event_date >= yesterday()
|
|
)
|
|
ORDER BY name
|
|
""",
|
|
True,
|
|
)
|
|
and reportCoverageFor(
|
|
args,
|
|
"aggregate functions",
|
|
"""
|
|
SELECT name
|
|
FROM system.functions
|
|
WHERE is_aggregate AND origin = 'System' AND alias_to = ''
|
|
AND name NOT IN
|
|
(
|
|
SELECT arrayJoin(used_aggregate_functions) FROM system.query_log WHERE event_date >= yesterday()
|
|
)
|
|
ORDER BY name
|
|
""",
|
|
)
|
|
and reportCoverageFor(
|
|
args,
|
|
"aggregate function combinators",
|
|
"""
|
|
SELECT name
|
|
FROM system.aggregate_function_combinators
|
|
WHERE NOT is_internal
|
|
AND name NOT IN
|
|
(
|
|
SELECT arrayJoin(used_aggregate_function_combinators) FROM system.query_log WHERE event_date >= yesterday()
|
|
)
|
|
ORDER BY name
|
|
""",
|
|
)
|
|
and reportCoverageFor(
|
|
args,
|
|
"data type families",
|
|
"""
|
|
SELECT name
|
|
FROM system.data_type_families
|
|
WHERE alias_to = '' AND name NOT LIKE 'Interval%'
|
|
AND name NOT IN
|
|
(
|
|
SELECT arrayJoin(used_data_type_families) FROM system.query_log WHERE event_date >= yesterday()
|
|
)
|
|
ORDER BY name
|
|
""",
|
|
)
|
|
)
|
|
|
|
|
|
def reportLogStats(args):
|
|
clickhouse_execute(args, "SYSTEM FLUSH LOGS")
|
|
|
|
query = """
|
|
WITH
|
|
120 AS mins,
|
|
(
|
|
SELECT (count(), sum(length(message)))
|
|
FROM system.text_log
|
|
WHERE (now() - toIntervalMinute(mins)) < event_time
|
|
) AS total
|
|
SELECT
|
|
count() AS count,
|
|
round(count / (total.1), 3) AS `count_%`,
|
|
formatReadableSize(sum(length(message))) AS size,
|
|
round(sum(length(message)) / (total.2), 3) AS `size_%`,
|
|
countDistinct(logger_name) AS uniq_loggers,
|
|
countDistinct(thread_id) AS uniq_threads,
|
|
groupArrayDistinct(toString(level)) AS levels,
|
|
round(sum(query_id = '') / count, 3) AS `background_%`,
|
|
message_format_string
|
|
FROM system.text_log
|
|
WHERE (now() - toIntervalMinute(mins)) < event_time
|
|
GROUP BY message_format_string
|
|
ORDER BY count DESC
|
|
LIMIT 100
|
|
FORMAT TSVWithNamesAndTypes
|
|
"""
|
|
value = clickhouse_execute(args, query).decode(errors="replace")
|
|
print("\nTop patterns of log messages:\n")
|
|
print(value)
|
|
print("\n")
|
|
|
|
query = """
|
|
WITH
|
|
120 AS mins
|
|
SELECT
|
|
count() AS count,
|
|
substr(replaceRegexpAll(message, '[^A-Za-z]+', ''), 1, 32) AS pattern,
|
|
substr(any(message), 1, 256) as runtime_message,
|
|
any((extract(source_file, '\/[a-zA-Z0-9_]+\.[a-z]+'), source_line)) as line
|
|
FROM system.text_log
|
|
WHERE (now() - toIntervalMinute(mins)) < event_time AND message_format_string = ''
|
|
GROUP BY pattern
|
|
ORDER BY count DESC
|
|
LIMIT 30
|
|
FORMAT TSVWithNamesAndTypes
|
|
"""
|
|
value = clickhouse_execute(args, query).decode(errors="replace")
|
|
print("\nTop messages without format string (fmt::runtime):\n")
|
|
print(value)
|
|
print("\n")
|
|
|
|
query = """
|
|
SELECT message_format_string, count(), substr(any(message), 1, 120) AS any_message
|
|
FROM system.text_log
|
|
WHERE (now() - toIntervalMinute(120)) < event_time
|
|
AND (message NOT LIKE (replaceRegexpAll(message_format_string, '{[:.0-9dfx]*}', '%') AS s))
|
|
AND (message NOT LIKE concat('%Exception: ', s, '%'))
|
|
GROUP BY message_format_string ORDER BY count() DESC LIMIT 20 FORMAT TSVWithNamesAndTypes
|
|
"""
|
|
value = clickhouse_execute(args, query).decode(errors="replace")
|
|
print("\nTop messages that does not match its format string:\n")
|
|
print(value)
|
|
print("\n")
|
|
|
|
query = """
|
|
WITH ('', '({}) Keys: {}', '({}) {}', 'Aggregating', 'Became leader', 'Cleaning queue', 'Creating set.',
|
|
'Cyclic aliases', 'Detaching {}', 'Executing {}', 'Fire events: {}', 'Found part {}', 'Loaded queue',
|
|
'No sharding key', 'No tables', 'Query: {}', 'Removed', 'Removed part {}', 'Removing parts.',
|
|
'Request URI: {}', 'Sending part {}', 'Sent handshake', 'Starting {}', 'Will mimic {}', 'Writing to {}',
|
|
'dropIfEmpty', 'loadAll {}', '{} ({}:{})', '{} -> {}', '{} {}', '{}: {}', 'Query was cancelled',
|
|
'Table {} already exists', '{}%', 'Cancelled merging parts', 'All replicas are lost',
|
|
'Cancelled mutating parts', 'Read object: {}', 'New segment: {}', 'Unknown geometry type {}',
|
|
'Table {} is not replicated', '{} {}.{} already exists', 'Attempt to read after eof',
|
|
'Replica {} already exists', 'Convert overflow', 'key must be a tuple', 'Division by zero',
|
|
'No part {} in committed state', 'Files set to {}', 'Bytes set to {}', 'Sharding key {} is not used',
|
|
'Cannot parse datetime', 'Bad get: has {}, requested {}', 'There is no {} in {}', 'Numeric overflow'
|
|
) AS known_short_messages
|
|
SELECT count() AS c, message_format_string, substr(any(message), 1, 120)
|
|
FROM system.text_log
|
|
WHERE (now() - toIntervalMinute(120)) < event_time
|
|
AND (length(message_format_string) < 16
|
|
OR (length(message_format_string) < 30 AND message ilike '%DB::Exception%'))
|
|
AND message_format_string NOT IN known_short_messages
|
|
GROUP BY message_format_string ORDER BY c DESC LIMIT 30 FORMAT TSVWithNamesAndTypes
|
|
"""
|
|
value = clickhouse_execute(args, query).decode(errors="replace")
|
|
print("\nTop short messages:\n")
|
|
print(value)
|
|
print("\n")
|
|
|
|
query = """
|
|
SELECT max((freq, message_format_string)), level
|
|
FROM (SELECT count() / (SELECT count() FROM system.text_log
|
|
WHERE (now() - toIntervalMinute(120)) < event_time) AS freq,
|
|
min(level) AS level, message_format_string FROM system.text_log
|
|
WHERE (now() - toIntervalMinute(120)) < event_time
|
|
GROUP BY message_format_string ORDER BY freq DESC)
|
|
GROUP BY level
|
|
"""
|
|
value = clickhouse_execute(args, query).decode(errors="replace")
|
|
print("\nTop messages by level:\n")
|
|
print(value)
|
|
print("\n")
|
|
|
|
|
|
def main(args):
|
|
global server_died
|
|
global stop_time
|
|
global exit_code
|
|
global server_logs_level
|
|
global restarted_tests
|
|
|
|
if not check_server_started(args):
|
|
msg = "Server is not responding. Cannot execute 'SELECT 1' query."
|
|
if args.hung_check:
|
|
print(msg)
|
|
pid = get_server_pid()
|
|
print("Got server pid", pid)
|
|
print_stacktraces()
|
|
raise Exception(msg)
|
|
|
|
args.build_flags = collect_build_flags(args)
|
|
args.changed_merge_tree_settings = collect_changed_merge_tree_settings(args)
|
|
args.suppport_system_processes_is_all_data_sent = check_table_column(
|
|
args, "system", "processes", "is_all_data_sent"
|
|
)
|
|
|
|
if args.s3_storage and (
|
|
BuildFlags.THREAD in args.build_flags or BuildFlags.DEBUG in args.build_flags
|
|
):
|
|
args.no_random_settings = True
|
|
|
|
if args.skip:
|
|
args.skip = set(args.skip)
|
|
|
|
base_dir = os.path.abspath(args.queries)
|
|
|
|
# Keep same default values as in queries/shell_config.sh
|
|
os.environ.setdefault("CLICKHOUSE_BINARY", args.binary)
|
|
# os.environ.setdefault("CLICKHOUSE_CLIENT", args.client)
|
|
os.environ.setdefault("CLICKHOUSE_CONFIG", args.configserver)
|
|
|
|
if args.configclient:
|
|
os.environ.setdefault("CLICKHOUSE_CONFIG_CLIENT", args.configclient)
|
|
|
|
# Force to print server warnings in stderr
|
|
# Shell scripts could change logging level
|
|
os.environ.setdefault("CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL", server_logs_level)
|
|
|
|
# This code is bad as the time is not monotonic
|
|
if args.global_time_limit:
|
|
stop_time = time() + args.global_time_limit
|
|
|
|
if args.zookeeper is None:
|
|
args.zookeeper = True
|
|
|
|
if args.shard is None:
|
|
args.shard = bool(extract_key(' --key listen_host | grep -E "127.0.0.2|::"'))
|
|
|
|
def create_common_database(args, db_name):
|
|
create_database_retries = 0
|
|
while create_database_retries < MAX_RETRIES:
|
|
start_time = datetime.now()
|
|
try:
|
|
clickhouse_execute(
|
|
args,
|
|
f"CREATE DATABASE IF NOT EXISTS {db_name} "
|
|
f"{get_db_engine(args, db_name)}",
|
|
settings=get_create_database_settings(args, None),
|
|
)
|
|
except HTTPError as e:
|
|
total_time = (datetime.now() - start_time).total_seconds()
|
|
if not need_retry(args, e.message, e.message, total_time):
|
|
break
|
|
create_database_retries += 1
|
|
|
|
try:
|
|
if args.database and args.database != "test":
|
|
create_common_database(args, args.database)
|
|
|
|
create_common_database(args, "test")
|
|
except Exception as e:
|
|
print(f"Failed to create databases for tests: {e}")
|
|
server_died.set()
|
|
|
|
total_tests_run = 0
|
|
|
|
for suite in sorted(os.listdir(base_dir), key=suite_key_func):
|
|
if server_died.is_set():
|
|
break
|
|
|
|
test_suite = TestSuite.read_test_suite(args, suite)
|
|
if test_suite is None:
|
|
continue
|
|
|
|
total_tests_run += do_run_tests(args.jobs, test_suite, args.parallel)
|
|
|
|
if server_died.is_set():
|
|
exit_code.value = 1
|
|
|
|
if args.hung_check:
|
|
# Some queries may execute in background for some time after test was finished. This is normal.
|
|
for _ in range(1, 60):
|
|
processlist = get_processlist_with_stacktraces(args)
|
|
if not processlist:
|
|
break
|
|
sleep(1)
|
|
|
|
if processlist:
|
|
print(
|
|
colored(
|
|
"\nFound hung queries in processlist:", args, "red", attrs=["bold"]
|
|
)
|
|
)
|
|
print(json.dumps(processlist, indent=4))
|
|
print(get_transactions_list(args))
|
|
|
|
print_stacktraces()
|
|
exit_code.value = 1
|
|
else:
|
|
print(colored("\nNo queries hung.", args, "green", attrs=["bold"]))
|
|
|
|
if len(restarted_tests) > 0:
|
|
print("\nSome tests were restarted:\n")
|
|
|
|
for test_result in restarted_tests:
|
|
print(f"\n{test_result.case_name:72}: ")
|
|
# replace it with lowercase to avoid parsing retried tests as failed
|
|
for status in TestStatus:
|
|
test_result.description = test_result.description.replace(
|
|
status.value, status.value.lower()
|
|
)
|
|
print(test_result.description)
|
|
|
|
if total_tests_run == 0:
|
|
print("No tests were run.")
|
|
sys.exit(1)
|
|
else:
|
|
print("All tests have finished.")
|
|
|
|
if args.report_logs_stats:
|
|
try:
|
|
reportLogStats(args)
|
|
except Exception as e:
|
|
print(f"Failed to get stats about log messages: {e}")
|
|
|
|
if args.report_coverage and not reportCoverage(args):
|
|
exit_code.value = 1
|
|
|
|
sys.exit(exit_code.value)
|
|
|
|
|
|
def find_binary(name):
|
|
if os.access(name, os.X_OK):
|
|
return name
|
|
paths = os.environ.get("PATH").split(":")
|
|
for path in paths:
|
|
bin_path = os.path.join(path, name)
|
|
if os.access(bin_path, os.X_OK):
|
|
return bin_path
|
|
|
|
# maybe it wasn't in PATH
|
|
bin_path = os.path.join("/usr/local/bin", name)
|
|
if os.access(bin_path, os.X_OK):
|
|
return bin_path
|
|
bin_path = os.path.join("/usr/bin", name)
|
|
if os.access(bin_path, os.X_OK):
|
|
return bin_path
|
|
|
|
raise Exception(f"{name} was not found in PATH")
|
|
|
|
def find_clickhouse_command(binary, command):
|
|
symlink = binary + "-" + command
|
|
if os.access(symlink, os.X_OK):
|
|
return symlink
|
|
|
|
# To avoid requiring symlinks (in case you download binary from CI)
|
|
return binary + " " + command
|
|
|
|
def get_additional_client_options(args):
|
|
if args.client_option:
|
|
return " ".join("--" + option for option in args.client_option)
|
|
return ""
|
|
|
|
|
|
def get_additional_client_options_url(args):
|
|
if args.client_option:
|
|
return "&".join(args.client_option)
|
|
return ""
|
|
|
|
|
|
def parse_args():
|
|
parser = ArgumentParser(description="ClickHouse functional tests")
|
|
parser.add_argument("-q", "--queries", help="Path to queries dir")
|
|
parser.add_argument("--tmp", help="Path to tmp dir")
|
|
|
|
parser.add_argument(
|
|
"-b",
|
|
"--binary",
|
|
default=find_binary("clickhouse"),
|
|
help="Path to clickhouse binary or name of binary in PATH",
|
|
)
|
|
parser.add_argument(
|
|
"-c",
|
|
"--client",
|
|
help="Path to clickhouse-client, this option is useless"
|
|
"name of binary in PATH",
|
|
)
|
|
|
|
parser.add_argument("--extract_from_config", help="extract-from-config program")
|
|
parser.add_argument(
|
|
"--configclient", help="Client config (if you use not default ports)"
|
|
)
|
|
parser.add_argument(
|
|
"--configserver",
|
|
default="/etc/clickhouse-server/config.xml",
|
|
help="Preprocessed server config",
|
|
)
|
|
parser.add_argument(
|
|
"-o", "--output", help="Output xUnit compliant test report directory"
|
|
)
|
|
parser.add_argument(
|
|
"-t",
|
|
"--timeout",
|
|
type=int,
|
|
default=600,
|
|
help="Timeout for each test case in seconds",
|
|
)
|
|
parser.add_argument(
|
|
"--global_time_limit",
|
|
type=int,
|
|
help="Stop if executing more than specified time (after current test finished)",
|
|
)
|
|
parser.add_argument("test", nargs="*", help="Optional test case name regex")
|
|
parser.add_argument(
|
|
"-d",
|
|
"--disabled",
|
|
action="store_true",
|
|
default=False,
|
|
help="Also run disabled tests",
|
|
)
|
|
parser.add_argument(
|
|
"--stop",
|
|
action="store_true",
|
|
default=None,
|
|
dest="stop",
|
|
help="Stop on network errors",
|
|
)
|
|
parser.add_argument(
|
|
"--order", default="desc", choices=["asc", "desc", "random"], help="Run order"
|
|
)
|
|
parser.add_argument(
|
|
"--testname",
|
|
action="store_true",
|
|
default=None,
|
|
dest="testname",
|
|
help="Make query with test name before test run",
|
|
)
|
|
parser.add_argument("--hung-check", action="store_true", default=False)
|
|
parser.add_argument("--no-left-queries-check", action="store_true", default=False)
|
|
parser.add_argument("--force-color", action="store_true", default=False)
|
|
parser.add_argument(
|
|
"--database", help="Database for tests (random name test_XXXXXX by default)"
|
|
)
|
|
parser.add_argument(
|
|
"--no-drop-if-fail",
|
|
action="store_true",
|
|
help="Do not drop database for test if test has failed (does not work if reference file mismatch)",
|
|
)
|
|
parser.add_argument(
|
|
"--hide-db-name",
|
|
action="store_true",
|
|
help='Replace random database name with "default" in stderr',
|
|
)
|
|
parser.add_argument(
|
|
"--parallel", default="1/1", help="One parallel test run number/total"
|
|
)
|
|
parser.add_argument(
|
|
"-j", "--jobs", default=1, nargs="?", type=int, help="Run all tests in parallel"
|
|
)
|
|
parser.add_argument(
|
|
"--test-runs",
|
|
default=1,
|
|
nargs="?",
|
|
type=int,
|
|
help="Run each test many times (useful for e.g. flaky check)",
|
|
)
|
|
parser.add_argument(
|
|
"-U",
|
|
"--unified",
|
|
default=3,
|
|
type=int,
|
|
help="output NUM lines of unified context",
|
|
)
|
|
parser.add_argument(
|
|
"-r",
|
|
"--server-check-retries",
|
|
default=180,
|
|
type=int,
|
|
help="Num of tries to execute SELECT 1 before tests started",
|
|
)
|
|
parser.add_argument("--db-engine", help="Database engine name")
|
|
parser.add_argument(
|
|
"--replicated-database",
|
|
action="store_true",
|
|
default=False,
|
|
help="Run tests with Replicated database engine",
|
|
)
|
|
parser.add_argument(
|
|
"--fast-tests-only",
|
|
action="store_true",
|
|
default=False,
|
|
help='Run only fast tests (the tests without the "no-fasttest" tag)',
|
|
)
|
|
parser.add_argument(
|
|
"--no-stateless", action="store_true", help="Disable all stateless tests"
|
|
)
|
|
parser.add_argument(
|
|
"--no-stateful", action="store_true", help="Disable all stateful tests"
|
|
)
|
|
parser.add_argument("--skip", nargs="+", help="Skip these tests")
|
|
parser.add_argument(
|
|
"--sequential",
|
|
nargs="+",
|
|
help="Run these tests sequentially even if --parallel specified",
|
|
)
|
|
parser.add_argument(
|
|
"--no-long", action="store_true", dest="no_long", help="Do not run long tests"
|
|
)
|
|
parser.add_argument(
|
|
"--client-option", nargs="+", help="Specify additional client argument"
|
|
)
|
|
parser.add_argument(
|
|
"--print-time", action="store_true", dest="print_time", help="Print test time"
|
|
)
|
|
parser.add_argument(
|
|
"--check-zookeeper-session",
|
|
action="store_true",
|
|
help="Check ZooKeeper session uptime to determine if failed test should be retried",
|
|
)
|
|
parser.add_argument(
|
|
"--s3-storage",
|
|
action="store_true",
|
|
default=False,
|
|
help="Run tests over s3 storage",
|
|
)
|
|
parser.add_argument(
|
|
"--no-random-settings",
|
|
action="store_true",
|
|
default=False,
|
|
help="Disable settings randomization",
|
|
)
|
|
parser.add_argument(
|
|
"--no-random-merge-tree-settings",
|
|
action="store_true",
|
|
default=False,
|
|
help="Disable MergeTree settings randomization",
|
|
)
|
|
parser.add_argument(
|
|
"--run-by-hash-num",
|
|
type=int,
|
|
help="Run tests matching crc32(test_name) % run_by_hash_total == run_by_hash_num",
|
|
)
|
|
parser.add_argument(
|
|
"--run-by-hash-total",
|
|
type=int,
|
|
help="Total test groups for crc32(test_name) % run_by_hash_total == run_by_hash_num",
|
|
)
|
|
|
|
group = parser.add_mutually_exclusive_group(required=False)
|
|
group.add_argument(
|
|
"--zookeeper",
|
|
action="store_true",
|
|
default=None,
|
|
dest="zookeeper",
|
|
help="Run zookeeper related tests",
|
|
)
|
|
group.add_argument(
|
|
"--no-zookeeper",
|
|
action="store_false",
|
|
default=None,
|
|
dest="zookeeper",
|
|
help="Do not run zookeeper related tests",
|
|
)
|
|
|
|
group = parser.add_mutually_exclusive_group(required=False)
|
|
group.add_argument(
|
|
"--shard",
|
|
action="store_true",
|
|
default=None,
|
|
dest="shard",
|
|
help="Run sharding related tests "
|
|
"(required to clickhouse-server listen 127.0.0.2 127.0.0.3)",
|
|
)
|
|
group.add_argument(
|
|
"--no-shard",
|
|
action="store_false",
|
|
default=None,
|
|
dest="shard",
|
|
help="Do not run shard related tests",
|
|
)
|
|
|
|
group.add_argument(
|
|
"--upgrade-check",
|
|
action="store_true",
|
|
help="Run tests for further server upgrade testing by ignoring all"
|
|
"drop queries in tests for collecting data from new version of server",
|
|
)
|
|
parser.add_argument(
|
|
"--secure",
|
|
action="store_true",
|
|
default=False,
|
|
help="Use secure connection to connect to clickhouse-server",
|
|
)
|
|
parser.add_argument(
|
|
"--max-failures-chain",
|
|
default=20,
|
|
type=int,
|
|
help="Max number of failed tests in a row (stop tests if higher)",
|
|
)
|
|
parser.add_argument(
|
|
"--report-coverage",
|
|
action="store_true",
|
|
default=False,
|
|
help="Check what high-level server components were covered by tests",
|
|
)
|
|
parser.add_argument(
|
|
"--report-logs-stats",
|
|
action="store_true",
|
|
default=False,
|
|
help="Report statistics about log messages",
|
|
)
|
|
parser.add_argument(
|
|
"--no-parallel-replicas",
|
|
action="store_true",
|
|
default=False,
|
|
help="Do not include tests that are not supported with parallel replicas feature",
|
|
)
|
|
|
|
return parser.parse_args()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
stop_time = None
|
|
exit_code = multiprocessing.Value("i", 0)
|
|
server_died = multiprocessing.Event()
|
|
stop_tests_triggered_lock = multiprocessing.Lock()
|
|
stop_tests_triggered = multiprocessing.Event()
|
|
queue = multiprocessing.Queue(maxsize=1)
|
|
multiprocessing_manager = multiprocessing.Manager()
|
|
restarted_tests = multiprocessing_manager.list()
|
|
|
|
# Move to a new process group and kill it at exit so that we don't have any
|
|
# infinite tests processes left
|
|
# (new process group is required to avoid killing some parent processes)
|
|
os.setpgid(0, 0)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGHUP, signal_handler)
|
|
|
|
try:
|
|
args = parse_args()
|
|
except Exception as e:
|
|
print(e, file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
if args.queries and not os.path.isdir(args.queries):
|
|
print(
|
|
f"Cannot access the specified directory with queries ({args.queries})",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
# Autodetect the directory with queries if not specified
|
|
if args.queries is None:
|
|
args.queries = "queries"
|
|
|
|
if not os.path.isdir(args.queries):
|
|
# If we're running from the repo
|
|
args.queries = os.path.join(
|
|
os.path.dirname(os.path.abspath(__file__)), "queries"
|
|
)
|
|
|
|
if not os.path.isdir(args.queries):
|
|
# Next we're going to try some system directories, don't write 'stdout' files into them.
|
|
if args.tmp is None:
|
|
args.tmp = "/tmp/clickhouse-test"
|
|
|
|
args.queries = "/usr/local/share/clickhouse-test/queries"
|
|
|
|
if not os.path.isdir(args.queries):
|
|
args.queries = "/usr/share/clickhouse-test/queries"
|
|
|
|
if not os.path.isdir(args.queries):
|
|
print(
|
|
"Failed to detect path to the queries directory. Please specify it with "
|
|
"'--queries' option.",
|
|
file=sys.stderr,
|
|
)
|
|
sys.exit(1)
|
|
|
|
print("Using queries from '" + args.queries + "' directory")
|
|
|
|
if args.tmp is None:
|
|
args.tmp = args.queries
|
|
|
|
if args.client:
|
|
print(
|
|
"WARNING: --client option is deprecated and will be removed the the future, use --binary instead",
|
|
file=sys.stderr,
|
|
)
|
|
|
|
args.client = find_clickhouse_command(args.binary, "client")
|
|
|
|
if args.extract_from_config:
|
|
print(
|
|
"WARNING: --extract_from_config option is deprecated and will be removed the the future",
|
|
file=sys.stderr,
|
|
)
|
|
args.extract_from_config = find_clickhouse_command(args.binary, "extract-from-config")
|
|
|
|
if args.configclient:
|
|
args.client += " --config-file=" + args.configclient
|
|
|
|
tcp_host = os.getenv("CLICKHOUSE_HOST")
|
|
if tcp_host is not None:
|
|
args.tcp_host = tcp_host
|
|
args.client += f" --host={tcp_host}"
|
|
else:
|
|
args.tcp_host = "localhost"
|
|
|
|
tcp_port = os.getenv("CLICKHOUSE_PORT_TCP")
|
|
if tcp_port is not None:
|
|
args.tcp_port = int(tcp_port)
|
|
args.client += f" --port={tcp_port}"
|
|
else:
|
|
args.tcp_port = 9440 if args.secure else 9000
|
|
if args.secure:
|
|
os.environ["CLICKHOUSE_PORT_TCP"] = str(args.tcp_port)
|
|
|
|
http_port = os.getenv("CLICKHOUSE_PORT_HTTP")
|
|
if http_port is not None:
|
|
args.http_port = int(http_port)
|
|
else:
|
|
args.http_port = 8443 if args.secure else 8123
|
|
os.environ["CLICKHOUSE_PORT_HTTP"] = str(args.http_port)
|
|
if args.secure and os.getenv("CLICKHOUSE_PORT_HTTP_PROTO") is None:
|
|
os.environ["CLICKHOUSE_PORT_HTTP_PROTO"] = "https"
|
|
|
|
client_database = os.getenv("CLICKHOUSE_DATABASE")
|
|
if client_database is not None:
|
|
args.client += f" --database={client_database}"
|
|
args.client_database = client_database
|
|
else:
|
|
args.client_database = "default"
|
|
|
|
if args.upgrade_check:
|
|
args.client += " --fake-drop"
|
|
|
|
if args.client_option or args.secure:
|
|
# Set options for client
|
|
if "CLICKHOUSE_CLIENT_OPT" in os.environ:
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"] += " "
|
|
else:
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"] = ""
|
|
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"] += get_additional_client_options(args)
|
|
if args.secure:
|
|
os.environ["CLICKHOUSE_CLIENT_OPT"] += " --secure "
|
|
|
|
# Set options for curl
|
|
if "CLICKHOUSE_URL_PARAMS" in os.environ:
|
|
os.environ["CLICKHOUSE_URL_PARAMS"] += "&"
|
|
else:
|
|
os.environ["CLICKHOUSE_URL_PARAMS"] = ""
|
|
|
|
client_options_query_str = get_additional_client_options_url(args)
|
|
args.client_options_query_str = client_options_query_str + "&"
|
|
os.environ["CLICKHOUSE_URL_PARAMS"] += client_options_query_str
|
|
else:
|
|
args.client_options_query_str = ""
|
|
|
|
if args.jobs is None:
|
|
args.jobs = multiprocessing.cpu_count()
|
|
|
|
if args.db_engine and args.db_engine == "Ordinary":
|
|
MESSAGES_TO_RETRY.append(" locking attempt on ")
|
|
|
|
main(args)
|