mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-14 10:22:10 +00:00
8545ce1111
Fixing ldap tests to use large tail value to accomodate changes in exception length.
417 lines
16 KiB
Python
Executable File
417 lines
16 KiB
Python
Executable File
import os
|
|
import time
|
|
import inspect
|
|
import threading
|
|
import tempfile
|
|
|
|
from testflows.core import *
|
|
from testflows.asserts import error
|
|
from testflows.connect import Shell
|
|
from testflows.uexpect import ExpectTimeoutError
|
|
|
|
class QueryRuntimeException(Exception):
|
|
"""Exception during query execution on the server.
|
|
"""
|
|
pass
|
|
|
|
class Node(object):
|
|
"""Generic cluster node.
|
|
"""
|
|
config_d_dir = "/etc/clickhouse-server/config.d/"
|
|
|
|
def __init__(self, cluster, name):
|
|
self.cluster = cluster
|
|
self.name = name
|
|
|
|
def repr(self):
|
|
return f"Node(name='{self.name}')"
|
|
|
|
def restart(self, timeout=300, retries=5):
|
|
"""Restart node.
|
|
"""
|
|
with self.cluster.lock:
|
|
for key in list(self.cluster._bash.keys()):
|
|
if key.endswith(f"-{self.name}"):
|
|
shell = self.cluster._bash.pop(key)
|
|
shell.__exit__(None, None, None)
|
|
|
|
for retry in range(retries):
|
|
r = self.cluster.command(None, f'{self.cluster.docker_compose} restart {self.name}', timeout=timeout)
|
|
if r.exitcode == 0:
|
|
break
|
|
|
|
def start(self, timeout=300, retries=5):
|
|
"""Start node.
|
|
"""
|
|
for retry in range(retries):
|
|
r = self.cluster.command(None, f'{self.cluster.docker_compose} start {self.name}', timeout=timeout)
|
|
if r.exitcode == 0:
|
|
break
|
|
|
|
def stop(self, timeout=300, retries=5):
|
|
"""Stop node.
|
|
"""
|
|
with self.cluster.lock:
|
|
for key in list(self.cluster._bash.keys()):
|
|
if key.endswith(f"-{self.name}"):
|
|
shell = self.cluster._bash.pop(key)
|
|
shell.__exit__(None, None, None)
|
|
|
|
for retry in range(retries):
|
|
r = self.cluster.command(None, f'{self.cluster.docker_compose} stop {self.name}', timeout=timeout)
|
|
if r.exitcode == 0:
|
|
break
|
|
|
|
def command(self, *args, **kwargs):
|
|
return self.cluster.command(self.name, *args, **kwargs)
|
|
|
|
class ClickHouseNode(Node):
|
|
"""Node with ClickHouse server.
|
|
"""
|
|
def wait_healthy(self, timeout=300):
|
|
with By(f"waiting until container {self.name} is healthy"):
|
|
start_time = time.time()
|
|
while True:
|
|
if self.query("select 1", no_checks=1, timeout=300, steps=False).exitcode == 0:
|
|
break
|
|
if time.time() - start_time < timeout:
|
|
time.sleep(2)
|
|
continue
|
|
assert False, "container is not healthy"
|
|
|
|
def stop(self, timeout=300, safe=True, retries=5):
|
|
"""Stop node.
|
|
"""
|
|
if safe:
|
|
self.query("SYSTEM STOP MOVES")
|
|
self.query("SYSTEM STOP MERGES")
|
|
self.query("SYSTEM FLUSH LOGS")
|
|
with By("waiting for 5 sec for moves and merges to stop"):
|
|
time.sleep(5)
|
|
with And("forcing to sync everything to disk"):
|
|
self.command("sync", timeout=30)
|
|
|
|
with self.cluster.lock:
|
|
for key in list(self.cluster._bash.keys()):
|
|
if key.endswith(f"-{self.name}"):
|
|
shell = self.cluster._bash.pop(key)
|
|
shell.__exit__(None, None, None)
|
|
|
|
for retry in range(retries):
|
|
r = self.cluster.command(None, f'{self.cluster.docker_compose} stop {self.name}', timeout=timeout)
|
|
if r.exitcode == 0:
|
|
break
|
|
|
|
def start(self, timeout=300, wait_healthy=True, retries=5):
|
|
"""Start node.
|
|
"""
|
|
for retry in range(retries):
|
|
r = self.cluster.command(None, f'{self.cluster.docker_compose} start {self.name}', timeout=timeout)
|
|
if r.exitcode == 0:
|
|
break
|
|
|
|
if wait_healthy:
|
|
self.wait_healthy(timeout)
|
|
|
|
def restart(self, timeout=300, safe=True, wait_healthy=True, retries=5):
|
|
"""Restart node.
|
|
"""
|
|
if safe:
|
|
self.query("SYSTEM STOP MOVES")
|
|
self.query("SYSTEM STOP MERGES")
|
|
self.query("SYSTEM FLUSH LOGS")
|
|
with By("waiting for 5 sec for moves and merges to stop"):
|
|
time.sleep(5)
|
|
with And("forcing to sync everything to disk"):
|
|
self.command("sync", timeout=30)
|
|
|
|
with self.cluster.lock:
|
|
for key in list(self.cluster._bash.keys()):
|
|
if key.endswith(f"-{self.name}"):
|
|
shell = self.cluster._bash.pop(key)
|
|
shell.__exit__(None, None, None)
|
|
|
|
for retry in range(retries):
|
|
r = self.cluster.command(None, f'{self.cluster.docker_compose} restart {self.name}', timeout=timeout)
|
|
if r.exitcode == 0:
|
|
break
|
|
|
|
if wait_healthy:
|
|
self.wait_healthy(timeout)
|
|
|
|
def query(self, sql, message=None, exitcode=None, steps=True, no_checks=False,
|
|
raise_on_exception=False, step=By, settings=None, *args, **kwargs):
|
|
"""Execute and check query.
|
|
:param sql: sql query
|
|
:param message: expected message that should be in the output, default: None
|
|
:param exitcode: expected exitcode, default: None
|
|
"""
|
|
settings = list(settings or [])
|
|
|
|
if hasattr(current().context, "default_query_settings"):
|
|
settings += current().context.default_query_settings
|
|
|
|
if len(sql) > 1024:
|
|
with tempfile.NamedTemporaryFile("w", encoding="utf-8") as query:
|
|
query.write(sql)
|
|
query.flush()
|
|
command = f"cat \"{query.name}\" | {self.cluster.docker_compose} exec -T {self.name} clickhouse client -n"
|
|
for setting in settings:
|
|
name, value = setting
|
|
command += f" --{name} \"{value}\""
|
|
description = f"""
|
|
echo -e \"{sql[:100]}...\" > {query.name}
|
|
{command}
|
|
"""
|
|
with step("executing command", description=description, format_description=False) if steps else NullStep():
|
|
try:
|
|
r = self.cluster.bash(None)(command, *args, **kwargs)
|
|
except ExpectTimeoutError:
|
|
self.cluster.close_bash(None)
|
|
else:
|
|
command = f"echo -e \"{sql}\" | clickhouse client -n"
|
|
for setting in settings:
|
|
name, value = setting
|
|
command += f" --{name} \"{value}\""
|
|
with step("executing command", description=command, format_description=False) if steps else NullStep():
|
|
try:
|
|
r = self.cluster.bash(self.name)(command, *args, **kwargs)
|
|
except ExpectTimeoutError:
|
|
self.cluster.close_bash(self.name)
|
|
raise
|
|
|
|
if no_checks:
|
|
return r
|
|
|
|
if exitcode is not None:
|
|
with Then(f"exitcode should be {exitcode}") if steps else NullStep():
|
|
assert r.exitcode == exitcode, error(r.output)
|
|
|
|
if message is not None:
|
|
with Then(f"output should contain message", description=message) if steps else NullStep():
|
|
assert message in r.output, error(r.output)
|
|
|
|
if message is None or "Exception:" not in message:
|
|
with Then("check if output has exception") if steps else NullStep():
|
|
if "Exception:" in r.output:
|
|
if raise_on_exception:
|
|
raise QueryRuntimeException(r.output)
|
|
assert False, error(r.output)
|
|
|
|
return r
|
|
|
|
class Cluster(object):
|
|
"""Simple object around docker-compose cluster.
|
|
"""
|
|
def __init__(self, local=False,
|
|
clickhouse_binary_path=None, configs_dir=None,
|
|
nodes=None,
|
|
docker_compose="docker-compose", docker_compose_project_dir=None,
|
|
docker_compose_file="docker-compose.yml"):
|
|
|
|
self.terminating = False
|
|
self._bash = {}
|
|
self.clickhouse_binary_path = clickhouse_binary_path
|
|
self.configs_dir = configs_dir
|
|
self.local = local
|
|
self.nodes = nodes or {}
|
|
self.docker_compose = docker_compose
|
|
|
|
frame = inspect.currentframe().f_back
|
|
caller_dir = os.path.dirname(os.path.abspath(frame.f_globals["__file__"]))
|
|
|
|
# auto set configs directory
|
|
if self.configs_dir is None:
|
|
caller_configs_dir = caller_dir
|
|
if os.path.exists(caller_configs_dir):
|
|
self.configs_dir = caller_configs_dir
|
|
|
|
if not os.path.exists(self.configs_dir):
|
|
raise TypeError("configs directory '{self.configs_dir}' does not exist")
|
|
|
|
# auto set docker-compose project directory
|
|
if docker_compose_project_dir is None:
|
|
caller_project_dir = os.path.join(caller_dir, "docker-compose")
|
|
if os.path.exists(caller_project_dir):
|
|
docker_compose_project_dir = caller_project_dir
|
|
|
|
docker_compose_file_path = os.path.join(docker_compose_project_dir or "", docker_compose_file)
|
|
|
|
if not os.path.exists(docker_compose_file_path):
|
|
raise TypeError("docker compose file '{docker_compose_file_path}' does not exist")
|
|
|
|
self.docker_compose += f" --no-ansi --project-directory \"{docker_compose_project_dir}\" --file \"{docker_compose_file_path}\""
|
|
self.lock = threading.Lock()
|
|
|
|
def shell(self, node, timeout=300):
|
|
"""Returns unique shell terminal to be used.
|
|
"""
|
|
if node is None:
|
|
return Shell()
|
|
|
|
shell = Shell(command=[
|
|
"/bin/bash", "--noediting", "-c", f"{self.docker_compose} exec {node} bash --noediting"
|
|
], name=node)
|
|
|
|
shell.timeout = timeout
|
|
return shell
|
|
|
|
def bash(self, node, timeout=300):
|
|
"""Returns thread-local bash terminal
|
|
to a specific node.
|
|
:param node: name of the service
|
|
"""
|
|
test = current()
|
|
|
|
if self.terminating:
|
|
if test and (test.cflags & MANDATORY):
|
|
pass
|
|
else:
|
|
raise InterruptedError("terminating")
|
|
|
|
current_thread = threading.current_thread()
|
|
id = f"{current_thread.name}-{node}"
|
|
|
|
with self.lock:
|
|
if self._bash.get(id) is None:
|
|
if node is None:
|
|
self._bash[id] = Shell().__enter__()
|
|
else:
|
|
self._bash[id] = Shell(command=[
|
|
"/bin/bash", "--noediting", "-c", f"{self.docker_compose} exec {node} bash --noediting"
|
|
], name=node).__enter__()
|
|
|
|
self._bash[id].timeout = timeout
|
|
|
|
# clean up any stale open shells for threads that have exited
|
|
active_thread_names = {thread.name for thread in threading.enumerate()}
|
|
|
|
for bash_id in list(self._bash.keys()):
|
|
thread_name, node_name = bash_id.rsplit("-", 1)
|
|
if thread_name not in active_thread_names:
|
|
self._bash[bash_id].__exit__(None, None, None)
|
|
del self._bash[bash_id]
|
|
|
|
return self._bash[id]
|
|
|
|
def close_bash(self, node):
|
|
current_thread = threading.current_thread()
|
|
id = f"{current_thread.name}-{node}"
|
|
|
|
with self.lock:
|
|
if self._bash.get(id) is None:
|
|
return
|
|
self._bash[id].__exit__(None, None, None)
|
|
del self._bash[id]
|
|
|
|
def __enter__(self):
|
|
with Given("docker-compose cluster"):
|
|
self.up()
|
|
return self
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
try:
|
|
with Finally("I clean up"):
|
|
self.down()
|
|
finally:
|
|
with self.lock:
|
|
for shell in self._bash.values():
|
|
shell.__exit__(type, value, traceback)
|
|
|
|
def node(self, name):
|
|
"""Get object with node bound methods.
|
|
:param name: name of service name
|
|
"""
|
|
if name.startswith("clickhouse"):
|
|
return ClickHouseNode(self, name)
|
|
return Node(self, name)
|
|
|
|
def down(self, timeout=300):
|
|
"""Bring cluster down by executing docker-compose down."""
|
|
self.terminating = True
|
|
|
|
try:
|
|
bash = self.bash(None)
|
|
with self.lock:
|
|
# remove and close all not None node terminals
|
|
for id in list(self._bash.keys()):
|
|
shell = self._bash.pop(id)
|
|
if shell is not bash:
|
|
shell.__exit__(None, None, None)
|
|
else:
|
|
self._bash[id] = shell
|
|
finally:
|
|
return self.command(None, f"{self.docker_compose} down --timeout 60", bash=bash, timeout=timeout)
|
|
|
|
def up(self, timeout=30*60):
|
|
if self.local:
|
|
with Given("I am running in local mode"):
|
|
with Then("check --clickhouse-binary-path is specified"):
|
|
assert self.clickhouse_binary_path, "when running in local mode then --clickhouse-binary-path must be specified"
|
|
with And("path should exist"):
|
|
assert os.path.exists(self.clickhouse_binary_path)
|
|
|
|
with And("I set all the necessary environment variables"):
|
|
os.environ["COMPOSE_HTTP_TIMEOUT"] = "300"
|
|
os.environ["CLICKHOUSE_TESTS_SERVER_BIN_PATH"] = self.clickhouse_binary_path
|
|
os.environ["CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH"] = os.path.join(
|
|
os.path.dirname(self.clickhouse_binary_path), "clickhouse-odbc-bridge")
|
|
os.environ["CLICKHOUSE_TESTS_DIR"] = self.configs_dir
|
|
|
|
with And("I list environment variables to show their values"):
|
|
self.command(None, "env | grep CLICKHOUSE")
|
|
|
|
with Given("docker-compose"):
|
|
max_attempts = 5
|
|
for attempt in range(max_attempts):
|
|
with When(f"attempt {attempt}/{max_attempts}"):
|
|
with By("pulling images for all the services"):
|
|
cmd = self.command(None, f"{self.docker_compose} pull 2>&1 | tee", exitcode=None, timeout=timeout)
|
|
if cmd.exitcode != 0:
|
|
continue
|
|
with And("executing docker-compose down just in case it is up"):
|
|
cmd = self.command(None, f"{self.docker_compose} down --remove-orphans 2>&1 | tee", exitcode=None, timeout=timeout)
|
|
if cmd.exitcode != 0:
|
|
continue
|
|
with And("executing docker-compose up"):
|
|
cmd = self.command(None, f"{self.docker_compose} up -d 2>&1 | tee", timeout=timeout)
|
|
|
|
with Then("check there are no unhealthy containers"):
|
|
if "is unhealthy" in cmd.output:
|
|
self.command(None, f"{self.docker_compose} ps | tee")
|
|
self.command(None, f"{self.docker_compose} logs | tee")
|
|
|
|
if cmd.exitcode == 0:
|
|
break
|
|
|
|
if cmd.exitcode != 0:
|
|
fail("could not bring up docker-compose cluster")
|
|
|
|
with Then("wait all nodes report healhy"):
|
|
for name in self.nodes["clickhouse"]:
|
|
self.node(name).wait_healthy()
|
|
|
|
def command(self, node, command, message=None, exitcode=None, steps=True, bash=None, *args, **kwargs):
|
|
"""Execute and check command.
|
|
:param node: name of the service
|
|
:param command: command
|
|
:param message: expected message that should be in the output, default: None
|
|
:param exitcode: expected exitcode, default: None
|
|
:param steps: don't break command into steps, default: True
|
|
"""
|
|
with By("executing command", description=command, format_description=False) if steps else NullStep():
|
|
if bash is None:
|
|
bash = self.bash(node)
|
|
try:
|
|
r = bash(command, *args, **kwargs)
|
|
except ExpectTimeoutError:
|
|
self.close_bash(node)
|
|
raise
|
|
if exitcode is not None:
|
|
with Then(f"exitcode should be {exitcode}", format_name=False) if steps else NullStep():
|
|
assert r.exitcode == exitcode, error(r.output)
|
|
if message is not None:
|
|
with Then(f"output should contain message", description=message, format_description=False) if steps else NullStep():
|
|
assert message in r.output, error(r.output)
|
|
return r
|