ClickHouse/tests/testflows/helpers/cluster.py

455 lines
18 KiB
Python
Executable File

import os
import time
import inspect
import threading
import tempfile
from testflows.core import *
from testflows.asserts import error
from testflows.connect import Shell
from testflows.uexpect import ExpectTimeoutError
class QueryRuntimeException(Exception):
"""Exception during query execution on the server.
"""
pass
class Node(object):
"""Generic cluster node.
"""
config_d_dir = "/etc/clickhouse-server/config.d/"
def __init__(self, cluster, name):
self.cluster = cluster
self.name = name
def repr(self):
return f"Node(name='{self.name}')"
def restart(self, timeout=300, retries=5, safe=True):
"""Restart node.
"""
with self.cluster.lock:
for key in list(self.cluster._bash.keys()):
if key.endswith(f"-{self.name}"):
shell = self.cluster._bash.pop(key)
shell.__exit__(None, None, None)
for retry in range(retries):
r = self.cluster.command(None, f'{self.cluster.docker_compose} restart {self.name}', timeout=timeout)
if r.exitcode == 0:
break
def start(self, timeout=300, retries=5):
"""Start node.
"""
for retry in range(retries):
r = self.cluster.command(None, f'{self.cluster.docker_compose} start {self.name}', timeout=timeout)
if r.exitcode == 0:
break
def stop(self, timeout=300, retries=5, safe=True):
"""Stop node.
"""
with self.cluster.lock:
for key in list(self.cluster._bash.keys()):
if key.endswith(f"-{self.name}"):
shell = self.cluster._bash.pop(key)
shell.__exit__(None, None, None)
for retry in range(retries):
r = self.cluster.command(None, f'{self.cluster.docker_compose} stop {self.name}', timeout=timeout)
if r.exitcode == 0:
break
def command(self, *args, **kwargs):
return self.cluster.command(self.name, *args, **kwargs)
def cmd(self, cmd, message=None, exitcode=None, steps=True, shell_command="bash --noediting", no_checks=False,
raise_on_exception=False, step=By, *args, **kwargs):
"""Execute and check command.
:param cmd: command
:param message: expected message that should be in the output, default: None
:param exitcode: expected exitcode, default: None
"""
command = f"{cmd}"
with Step("executing command", description=command, format_description=False) if steps else NullStep():
try:
r = self.cluster.bash(self.name, command=shell_command)(command, *args, **kwargs)
except ExpectTimeoutError:
self.cluster.close_bash(self.name)
raise
if no_checks:
return r
if exitcode is not None:
with Then(f"exitcode should be {exitcode}") if steps else NullStep():
assert r.exitcode == exitcode, error(r.output)
if message is not None:
with Then(f"output should contain message", description=message) if steps else NullStep():
assert message in r.output, error(r.output)
if message is None or "Exception:" not in message:
with Then("check if output has exception") if steps else NullStep():
if "Exception:" in r.output:
if raise_on_exception:
raise QueryRuntimeException(r.output)
assert False, error(r.output)
return r
class ClickHouseNode(Node):
"""Node with ClickHouse server.
"""
def wait_healthy(self, timeout=300):
with By(f"waiting until container {self.name} is healthy"):
start_time = time.time()
while True:
if self.query("select 1", no_checks=1, timeout=300, steps=False).exitcode == 0:
break
if time.time() - start_time < timeout:
time.sleep(2)
continue
assert False, "container is not healthy"
def stop(self, timeout=300, safe=True, retries=5):
"""Stop node.
"""
if safe:
self.query("SYSTEM STOP MOVES")
self.query("SYSTEM STOP MERGES")
self.query("SYSTEM FLUSH LOGS")
with By("waiting for 5 sec for moves and merges to stop"):
time.sleep(5)
with And("forcing to sync everything to disk"):
self.command("sync", timeout=30)
with self.cluster.lock:
for key in list(self.cluster._bash.keys()):
if key.endswith(f"-{self.name}"):
shell = self.cluster._bash.pop(key)
shell.__exit__(None, None, None)
for retry in range(retries):
r = self.cluster.command(None, f'{self.cluster.docker_compose} stop {self.name}', timeout=timeout)
if r.exitcode == 0:
break
def start(self, timeout=300, wait_healthy=True, retries=5):
"""Start node.
"""
for retry in range(retries):
r = self.cluster.command(None, f'{self.cluster.docker_compose} start {self.name}', timeout=timeout)
if r.exitcode == 0:
break
if wait_healthy:
self.wait_healthy(timeout)
def restart(self, timeout=300, safe=True, wait_healthy=True, retries=5):
"""Restart node.
"""
if safe:
self.query("SYSTEM STOP MOVES")
self.query("SYSTEM STOP MERGES")
self.query("SYSTEM FLUSH LOGS")
with By("waiting for 5 sec for moves and merges to stop"):
time.sleep(5)
with And("forcing to sync everything to disk"):
self.command("sync", timeout=30)
with self.cluster.lock:
for key in list(self.cluster._bash.keys()):
if key.endswith(f"-{self.name}"):
shell = self.cluster._bash.pop(key)
shell.__exit__(None, None, None)
for retry in range(retries):
r = self.cluster.command(None, f'{self.cluster.docker_compose} restart {self.name}', timeout=timeout)
if r.exitcode == 0:
break
if wait_healthy:
self.wait_healthy(timeout)
def query(self, sql, message=None, exitcode=None, steps=True, no_checks=False,
raise_on_exception=False, step=By, settings=None, *args, **kwargs):
"""Execute and check query.
:param sql: sql query
:param message: expected message that should be in the output, default: None
:param exitcode: expected exitcode, default: None
"""
settings = list(settings or [])
if hasattr(current().context, "default_query_settings"):
settings += current().context.default_query_settings
if len(sql) > 1024:
with tempfile.NamedTemporaryFile("w", encoding="utf-8") as query:
query.write(sql)
query.flush()
command = f"cat \"{query.name}\" | {self.cluster.docker_compose} exec -T {self.name} clickhouse client -n"
for setting in settings:
name, value = setting
command += f" --{name} \"{value}\""
description = f"""
echo -e \"{sql[:100]}...\" > {query.name}
{command}
"""
with Step("executing command", description=description, format_description=False) if steps else NullStep():
try:
r = self.cluster.bash(None)(command, *args, **kwargs)
except ExpectTimeoutError:
self.cluster.close_bash(None)
else:
command = f"echo -e \"{sql}\" | clickhouse client -n"
for setting in settings:
name, value = setting
command += f" --{name} \"{value}\""
with Step("executing command", description=command, format_description=False) if steps else NullStep():
try:
r = self.cluster.bash(self.name)(command, *args, **kwargs)
except ExpectTimeoutError:
self.cluster.close_bash(self.name)
raise
if no_checks:
return r
if exitcode is not None:
with Then(f"exitcode should be {exitcode}") if steps else NullStep():
assert r.exitcode == exitcode, error(r.output)
if message is not None:
with Then(f"output should contain message", description=message) if steps else NullStep():
assert message in r.output, error(r.output)
if message is None or "Exception:" not in message:
with Then("check if output has exception") if steps else NullStep():
if "Exception:" in r.output:
if raise_on_exception:
raise QueryRuntimeException(r.output)
assert False, error(r.output)
return r
class Cluster(object):
"""Simple object around docker-compose cluster.
"""
def __init__(self, local=False,
clickhouse_binary_path=None, configs_dir=None,
nodes=None,
docker_compose="docker-compose", docker_compose_project_dir=None,
docker_compose_file="docker-compose.yml"):
self.terminating = False
self._bash = {}
self.clickhouse_binary_path = clickhouse_binary_path
self.configs_dir = configs_dir
self.local = local
self.nodes = nodes or {}
self.docker_compose = docker_compose
frame = inspect.currentframe().f_back
caller_dir = os.path.dirname(os.path.abspath(frame.f_globals["__file__"]))
# auto set configs directory
if self.configs_dir is None:
caller_configs_dir = caller_dir
if os.path.exists(caller_configs_dir):
self.configs_dir = caller_configs_dir
if not os.path.exists(self.configs_dir):
raise TypeError(f"configs directory '{self.configs_dir}' does not exist")
# auto set docker-compose project directory
if docker_compose_project_dir is None:
caller_project_dir = os.path.join(caller_dir, "docker-compose")
if os.path.exists(caller_project_dir):
docker_compose_project_dir = caller_project_dir
docker_compose_file_path = os.path.join(docker_compose_project_dir or "", docker_compose_file)
if not os.path.exists(docker_compose_file_path):
raise TypeError("docker compose file '{docker_compose_file_path}' does not exist")
self.docker_compose += f" --no-ansi --project-directory \"{docker_compose_project_dir}\" --file \"{docker_compose_file_path}\""
self.lock = threading.Lock()
def shell(self, node, timeout=300):
"""Returns unique shell terminal to be used.
"""
if node is None:
return Shell()
shell = Shell(command=[
"/bin/bash", "--noediting", "-c", f"{self.docker_compose} exec {node} bash --noediting"
], name=node)
shell.timeout = timeout
return shell
def bash(self, node, timeout=300, command="bash --noediting"):
"""Returns thread-local bash terminal
to a specific node.
:param node: name of the service
"""
test = current()
if self.terminating:
if test and (test.cflags & MANDATORY):
pass
else:
raise InterruptedError("terminating")
current_thread = threading.current_thread()
id = f"{current_thread.name}-{node}"
with self.lock:
if self._bash.get(id) is None:
if node is None:
self._bash[id] = Shell().__enter__()
else:
self._bash[id] = Shell(command=[
"/bin/bash", "--noediting", "-c", f"{self.docker_compose} exec {node} {command}"
], name=node).__enter__()
self._bash[id].timeout = timeout
# clean up any stale open shells for threads that have exited
active_thread_names = {thread.name for thread in threading.enumerate()}
for bash_id in list(self._bash.keys()):
thread_name, node_name = bash_id.rsplit("-", 1)
if thread_name not in active_thread_names:
self._bash[bash_id].__exit__(None, None, None)
del self._bash[bash_id]
return self._bash[id]
def close_bash(self, node):
current_thread = threading.current_thread()
id = f"{current_thread.name}-{node}"
with self.lock:
if self._bash.get(id) is None:
return
self._bash[id].__exit__(None, None, None)
del self._bash[id]
def __enter__(self):
with Given("docker-compose cluster"):
self.up()
return self
def __exit__(self, type, value, traceback):
try:
with Finally("I clean up"):
self.down()
finally:
with self.lock:
for shell in self._bash.values():
shell.__exit__(type, value, traceback)
def node(self, name):
"""Get object with node bound methods.
:param name: name of service name
"""
if name.startswith("clickhouse"):
return ClickHouseNode(self, name)
return Node(self, name)
def down(self, timeout=300):
"""Bring cluster down by executing docker-compose down."""
self.terminating = True
try:
bash = self.bash(None)
with self.lock:
# remove and close all not None node terminals
for id in list(self._bash.keys()):
shell = self._bash.pop(id)
if shell is not bash:
shell.__exit__(None, None, None)
else:
self._bash[id] = shell
finally:
return self.command(None, f"{self.docker_compose} down --timeout 60", bash=bash, timeout=timeout)
def up(self, timeout=30*60):
if self.local:
with Given("I am running in local mode"):
with Then("check --clickhouse-binary-path is specified"):
assert self.clickhouse_binary_path, "when running in local mode then --clickhouse-binary-path must be specified"
with And("path should exist"):
assert os.path.exists(self.clickhouse_binary_path)
with And("I set all the necessary environment variables"):
os.environ["COMPOSE_HTTP_TIMEOUT"] = "300"
os.environ["CLICKHOUSE_TESTS_SERVER_BIN_PATH"] = self.clickhouse_binary_path
os.environ["CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH"] = os.path.join(
os.path.dirname(self.clickhouse_binary_path), "clickhouse-odbc-bridge")
os.environ["CLICKHOUSE_TESTS_DIR"] = self.configs_dir
with And("I list environment variables to show their values"):
self.command(None, "env | grep CLICKHOUSE")
with Given("docker-compose"):
max_attempts = 5
for attempt in range(max_attempts):
with When(f"attempt {attempt}/{max_attempts}"):
with By("pulling images for all the services"):
cmd = self.command(None, f"{self.docker_compose} pull 2>&1 | tee", exitcode=None, timeout=timeout)
if cmd.exitcode != 0:
continue
with And("executing docker-compose down just in case it is up"):
cmd = self.command(None, f"{self.docker_compose} down --remove-orphans 2>&1 | tee", exitcode=None, timeout=timeout)
if cmd.exitcode != 0:
continue
with And("executing docker-compose up"):
cmd = self.command(None, f"{self.docker_compose} up -d 2>&1 | tee", timeout=timeout)
with Then("check there are no unhealthy containers"):
if "is unhealthy" in cmd.output:
self.command(None, f"{self.docker_compose} ps | tee")
self.command(None, f"{self.docker_compose} logs | tee")
if cmd.exitcode == 0:
break
if cmd.exitcode != 0:
fail("could not bring up docker-compose cluster")
with Then("wait all nodes report healhy"):
for name in self.nodes["clickhouse"]:
self.node(name).wait_healthy()
def command(self, node, command, message=None, exitcode=None, steps=True, bash=None, *args, **kwargs):
"""Execute and check command.
:param node: name of the service
:param command: command
:param message: expected message that should be in the output, default: None
:param exitcode: expected exitcode, default: None
:param steps: don't break command into steps, default: True
"""
with By("executing command", description=command, format_description=False) if steps else NullStep():
if bash is None:
bash = self.bash(node)
try:
r = bash(command, *args, **kwargs)
except ExpectTimeoutError:
self.close_bash(node)
raise
if exitcode is not None:
with Then(f"exitcode should be {exitcode}", format_name=False) if steps else NullStep():
assert r.exitcode == exitcode, error(r.output)
if message is not None:
with Then(f"output should contain message", description=message, format_description=False) if steps else NullStep():
assert message in r.output, error(r.output)
return r