mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 05:03:20 +00:00
862 lines
37 KiB
Python
Executable File
862 lines
37 KiB
Python
Executable File
import os
|
|
import uuid
|
|
import time
|
|
import inspect
|
|
import threading
|
|
import tempfile
|
|
|
|
from testflows._core.cli.arg.common import description
|
|
|
|
import testflows.settings as settings
|
|
|
|
from testflows.core import *
|
|
from testflows.asserts import error
|
|
from testflows.connect import Shell as ShellBase
|
|
from testflows.uexpect import ExpectTimeoutError
|
|
from testflows._core.testtype import TestSubType
|
|
|
|
MESSAGES_TO_RETRY = [
|
|
"DB::Exception: ZooKeeper session has been expired",
|
|
"DB::Exception: Connection loss",
|
|
"Coordination::Exception: Session expired",
|
|
"Coordination::Exception: Connection loss",
|
|
"Coordination::Exception: Operation timeout",
|
|
"DB::Exception: Operation timeout",
|
|
"Operation timed out",
|
|
"ConnectionPoolWithFailover: Connection failed at try",
|
|
"DB::Exception: New table appeared in database being dropped or detached. Try again",
|
|
"is already started to be removing by another replica right now",
|
|
"Shutdown is called for table", # happens in SYSTEM SYNC REPLICA query if session with ZooKeeper is being reinitialized.
|
|
"is executing longer than distributed_ddl_task_timeout" # distributed TTL timeout message
|
|
]
|
|
|
|
class Shell(ShellBase):
|
|
def __exit__(self, type, value, traceback):
|
|
# send exit and Ctrl-D repeatedly
|
|
# to terminate any open shell commands.
|
|
# This is needed for example
|
|
# to solve a problem with
|
|
# 'docker-compose exec {name} bash --noediting'
|
|
# that does not clean up open bash processes
|
|
# if not exited normally
|
|
for i in range(10):
|
|
if self.child is not None:
|
|
try:
|
|
self.send('exit\r', eol='')
|
|
self.send('\x04\r', eol='')
|
|
except OSError:
|
|
pass
|
|
return super(Shell, self).__exit__(type, value, traceback)
|
|
|
|
class QueryRuntimeException(Exception):
|
|
"""Exception during query execution on the server.
|
|
"""
|
|
pass
|
|
|
|
class Node(object):
|
|
"""Generic cluster node.
|
|
"""
|
|
config_d_dir = "/etc/clickhouse-server/config.d/"
|
|
|
|
def __init__(self, cluster, name):
|
|
self.cluster = cluster
|
|
self.name = name
|
|
|
|
def repr(self):
|
|
return f"Node(name='{self.name}')"
|
|
|
|
def close_bashes(self):
|
|
"""Close all active bashes to the node.
|
|
"""
|
|
with self.cluster.lock:
|
|
for key in list(self.cluster._bash.keys()):
|
|
if key.endswith(f"-{self.name}"):
|
|
shell = self.cluster._bash.pop(key)
|
|
shell.__exit__(None, None, None)
|
|
|
|
def wait_healthy(self, timeout=300):
|
|
with By(f"waiting until container {self.name} is healthy"):
|
|
for attempt in retries(timeout=timeout, delay=1):
|
|
with attempt:
|
|
if self.command("echo 1", no_checks=1, steps=False).exitcode != 0:
|
|
fail("container is not healthy")
|
|
|
|
def restart(self, timeout=300, retry_count=5, safe=True):
|
|
"""Restart node.
|
|
"""
|
|
self.close_bashes()
|
|
retry(self.cluster.command, retry_count)(
|
|
None, f'{self.cluster.docker_compose} restart {self.name}',
|
|
timeout=timeout, exitcode=0, steps=False)
|
|
|
|
def start(self, timeout=300, retry_count=5):
|
|
"""Start node.
|
|
"""
|
|
retry(self.cluster.command, retry_count)(
|
|
None, f'{self.cluster.docker_compose} start {self.name}',
|
|
timeout=timeout, exitcode=0, steps=False)
|
|
|
|
def stop(self, timeout=300, retry_count=5, safe=True):
|
|
"""Stop node.
|
|
"""
|
|
self.close_bashes()
|
|
|
|
retry(self.cluster.command, retry_count)(
|
|
None, f'{self.cluster.docker_compose} stop {self.name}',
|
|
timeout=timeout, exitcode=0, steps=False)
|
|
|
|
def command(self, *args, **kwargs):
|
|
return self.cluster.command(self.name, *args, **kwargs)
|
|
|
|
def cmd(self, cmd, message=None, exitcode=None, steps=True, shell_command="bash --noediting", no_checks=False,
|
|
raise_on_exception=False, step=By, *args, **kwargs):
|
|
"""Execute and check command.
|
|
:param cmd: command
|
|
:param message: expected message that should be in the output, default: None
|
|
:param exitcode: expected exitcode, default: None
|
|
"""
|
|
|
|
command = f"{cmd}"
|
|
with step("executing command", description=command, format_description=False) if steps else NullStep():
|
|
try:
|
|
r = self.cluster.bash(self.name, command=shell_command)(command, *args, **kwargs)
|
|
except ExpectTimeoutError:
|
|
self.cluster.close_bash(self.name)
|
|
raise
|
|
|
|
if no_checks:
|
|
return r
|
|
|
|
if exitcode is not None:
|
|
with Then(f"exitcode should be {exitcode}") if steps else NullStep():
|
|
assert r.exitcode == exitcode, error(r.output)
|
|
|
|
if message is not None:
|
|
with Then(f"output should contain message", description=message) if steps else NullStep():
|
|
assert message in r.output, error(r.output)
|
|
|
|
return r
|
|
|
|
|
|
class ClickHouseNode(Node):
|
|
"""Node with ClickHouse server.
|
|
"""
|
|
def thread_fuzzer(self):
|
|
with Given("exporting THREAD_FUZZER"):
|
|
self.command("export THREAD_FUZZER_CPU_TIME_PERIOD_US=1000")
|
|
self.command("export THREAD_FUZZER_SLEEP_PROBABILITY=0.1")
|
|
self.command("export THREAD_FUZZER_SLEEP_TIME_US=100000")
|
|
|
|
self.command("export THREAD_FUZZER_pthread_mutex_lock_BEFORE_MIGRATE_PROBABILITY=1")
|
|
self.command("export THREAD_FUZZER_pthread_mutex_lock_AFTER_MIGRATE_PROBABILITY=1")
|
|
self.command("export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_MIGRATE_PROBABILITY=1")
|
|
self.command("export THREAD_FUZZER_pthread_mutex_unlock_AFTER_MIGRATE_PROBABILITY=1")
|
|
|
|
self.command("export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_PROBABILITY=0.001")
|
|
self.command("export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_PROBABILITY=0.001")
|
|
self.command("export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_PROBABILITY=0.001")
|
|
self.command("export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_PROBABILITY=0.001")
|
|
self.command("export THREAD_FUZZER_pthread_mutex_lock_BEFORE_SLEEP_TIME_US=10000")
|
|
self.command("export THREAD_FUZZER_pthread_mutex_lock_AFTER_SLEEP_TIME_US=10000")
|
|
self.command("export THREAD_FUZZER_pthread_mutex_unlock_BEFORE_SLEEP_TIME_US=10000")
|
|
self.command("export THREAD_FUZZER_pthread_mutex_unlock_AFTER_SLEEP_TIME_US=10000")
|
|
|
|
def wait_clickhouse_healthy(self, timeout=300):
|
|
with By(f"waiting until ClickHouse server on {self.name} is healthy"):
|
|
for attempt in retries(timeout=timeout, delay=1):
|
|
with attempt:
|
|
if self.query("SELECT version()", no_checks=1, steps=False).exitcode != 0:
|
|
fail("ClickHouse server is not healthy")
|
|
node_version = self.query("SELECT version()", no_checks=1, steps=False).output
|
|
if current().context.clickhouse_version is None:
|
|
current().context.clickhouse_version = node_version
|
|
else:
|
|
assert current().context.clickhouse_version == node_version, error()
|
|
|
|
def clickhouse_pid(self):
|
|
"""Return ClickHouse server pid if present
|
|
otherwise return None.
|
|
"""
|
|
if self.command("ls /tmp/clickhouse-server.pid").exitcode == 0:
|
|
return self.command("cat /tmp/clickhouse-server.pid").output.strip()
|
|
return None
|
|
|
|
def stop_clickhouse(self, timeout=300, safe=True):
|
|
"""Stop ClickHouse server.
|
|
"""
|
|
if safe:
|
|
self.query("SYSTEM STOP MOVES")
|
|
self.query("SYSTEM STOP MERGES")
|
|
self.query("SYSTEM FLUSH LOGS")
|
|
with By("waiting for 5 sec for moves and merges to stop"):
|
|
time.sleep(5)
|
|
with And("forcing to sync everything to disk"):
|
|
self.command("sync", timeout=300, exitcode=0)
|
|
|
|
with By(f"sending kill -TERM to ClickHouse server process on {self.name}"):
|
|
pid = self.clickhouse_pid()
|
|
self.command(f"kill -TERM {pid}", exitcode=0, steps=False)
|
|
|
|
with And("checking pid does not exist"):
|
|
for i, attempt in enumerate(retries(timeout=100, delay=3)):
|
|
with attempt:
|
|
if i > 0 and i % 20 == 0:
|
|
self.command(f"kill -KILL {pid}", steps=False)
|
|
if self.command(f"ps {pid}", steps=False, no_checks=True).exitcode != 1:
|
|
fail("pid still alive")
|
|
|
|
with And("deleting ClickHouse server pid file"):
|
|
self.command("rm -rf /tmp/clickhouse-server.pid", exitcode=0, steps=False)
|
|
|
|
def start_clickhouse(self, timeout=300, wait_healthy=True, retry_count=5, user=None, thread_fuzzer=False):
|
|
"""Start ClickHouse server.
|
|
"""
|
|
pid = self.clickhouse_pid()
|
|
if pid:
|
|
raise RuntimeError(f"ClickHouse server already running with pid {pid}")
|
|
|
|
if thread_fuzzer:
|
|
self.thread_fuzzer()
|
|
|
|
if user is None:
|
|
with By("starting ClickHouse server process"):
|
|
self.command(
|
|
"clickhouse server --config-file=/etc/clickhouse-server/config.xml"
|
|
" --log-file=/var/log/clickhouse-server/clickhouse-server.log"
|
|
" --errorlog-file=/var/log/clickhouse-server/clickhouse-server.err.log"
|
|
" --pidfile=/tmp/clickhouse-server.pid --daemon",
|
|
exitcode=0, steps=False)
|
|
else:
|
|
with By(f"starting ClickHouse server process from {user}"):
|
|
self.command(f'su {user} -c'
|
|
'"clickhouse server --config-file=/etc/clickhouse-server/config.xml'
|
|
' --log-file=/var/log/clickhouse-server/clickhouse-server.log'
|
|
' --errorlog-file=/var/log/clickhouse-server/clickhouse-server.err.log'
|
|
' --pidfile=/tmp/clickhouse-server.pid --daemon"',
|
|
exitcode=0, steps=False)
|
|
|
|
with And("checking that ClickHouse server pid file was created"):
|
|
for attempt in retries(timeout=timeout, delay=1):
|
|
with attempt:
|
|
if self.command("ls /tmp/clickhouse-server.pid", steps=False, no_checks=True).exitcode != 0:
|
|
fail("no pid file yet")
|
|
|
|
if wait_healthy:
|
|
self.wait_clickhouse_healthy(timeout=timeout)
|
|
|
|
def restart_clickhouse(self, timeout=300, safe=True, wait_healthy=True, retry_count=5, user=None):
|
|
"""Restart ClickHouse server.
|
|
"""
|
|
if self.clickhouse_pid():
|
|
self.stop_clickhouse(timeout=timeout, safe=safe)
|
|
|
|
self.start_clickhouse(timeout=timeout, wait_healthy=wait_healthy, user=user)
|
|
|
|
def stop(self, timeout=300, safe=True, retry_count=5):
|
|
"""Stop node.
|
|
"""
|
|
if self.clickhouse_pid():
|
|
self.stop_clickhouse(timeout=timeout, safe=safe)
|
|
|
|
return super(ClickHouseNode, self).stop(timeout=timeout, retry_count=retry_count)
|
|
|
|
def start(self, timeout=300, start_clickhouse=True, wait_healthy=True, retry_count=5, user=None):
|
|
"""Start node.
|
|
"""
|
|
super(ClickHouseNode, self).start(timeout=timeout, retry_count=retry_count)
|
|
|
|
if start_clickhouse:
|
|
self.start_clickhouse(timeout=timeout, wait_healthy=wait_healthy, user=user,)
|
|
|
|
def restart(self, timeout=300, safe=True, start_clickhouse=True,
|
|
wait_healthy=True, retry_count=5, user=None):
|
|
"""Restart node.
|
|
"""
|
|
if self.clickhouse_pid():
|
|
self.stop_clickhouse(timeout=timeout, safe=safe)
|
|
|
|
super(ClickHouseNode, self).restart(timeout=timeout, retry_count=retry_count)
|
|
|
|
if start_clickhouse:
|
|
self.start_clickhouse(timeout=timeout, wait_healthy=wait_healthy, user=user)
|
|
|
|
def hash_query(self, sql, hash_utility="sha1sum", steps=True, step=By,
|
|
settings=None, secure=False, *args, **kwargs):
|
|
"""Execute sql query inside the container and return the hash of the output.
|
|
|
|
:param sql: sql query
|
|
:param hash_utility: hash function which used to compute hash
|
|
"""
|
|
settings = list(settings or [])
|
|
query_settings = list(settings)
|
|
|
|
if hasattr(current().context, "default_query_settings"):
|
|
query_settings += current().context.default_query_settings
|
|
|
|
client = "clickhouse client -n"
|
|
if secure:
|
|
client += " -s"
|
|
|
|
if len(sql) > 1024:
|
|
with tempfile.NamedTemporaryFile("w", encoding="utf-8") as query:
|
|
query.write(sql)
|
|
query.flush()
|
|
command = f"set -o pipefail && cat \"{query.name}\" | {self.cluster.docker_compose} exec -T {self.name} {client} | {hash_utility}"
|
|
for setting in query_settings:
|
|
name, value = setting
|
|
command += f" --{name} \"{value}\""
|
|
description = f"""
|
|
echo -e \"{sql[:100]}...\" > {query.name}
|
|
{command}
|
|
"""
|
|
with step("executing command", description=description,
|
|
format_description=False) if steps else NullStep():
|
|
try:
|
|
r = self.cluster.bash(None)(command, *args, **kwargs)
|
|
except ExpectTimeoutError:
|
|
self.cluster.close_bash(None)
|
|
else:
|
|
command = f"set -o pipefail && echo -e \"{sql}\" | {client} | {hash_utility}"
|
|
for setting in query_settings:
|
|
name, value = setting
|
|
command += f" --{name} \"{value}\""
|
|
with step("executing command", description=command,
|
|
format_description=False) if steps else NullStep():
|
|
try:
|
|
r = self.cluster.bash(self.name)(command, *args, **kwargs)
|
|
except ExpectTimeoutError:
|
|
self.cluster.close_bash(self.name)
|
|
|
|
with Then(f"exitcode should be 0") if steps else NullStep():
|
|
assert r.exitcode == 0, error(r.output)
|
|
|
|
return r.output
|
|
|
|
def diff_query(self, sql, expected_output, steps=True, step=By,
|
|
settings=None, secure=False, *args, **kwargs):
|
|
"""Execute inside the container but from the host and compare its output
|
|
to file that is located on the host.
|
|
|
|
For example:
|
|
diff <(echo "SELECT * FROM myints FORMAT CSVWithNames" | clickhouse-client -mn) select.out
|
|
|
|
:param sql: sql query
|
|
:param expected_output: path to the expected output
|
|
"""
|
|
settings = list(settings or [])
|
|
query_settings = list(settings)
|
|
|
|
if hasattr(current().context, "default_query_settings"):
|
|
query_settings += current().context.default_query_settings
|
|
|
|
client = "clickhouse client -n"
|
|
if secure:
|
|
client += " -s"
|
|
|
|
if len(sql) > 1024:
|
|
with tempfile.NamedTemporaryFile("w", encoding="utf-8") as query:
|
|
query.write(sql)
|
|
query.flush()
|
|
command = f"diff <(cat \"{query.name}\" | {self.cluster.docker_compose} exec -T {self.name} {client}) {expected_output}"
|
|
for setting in query_settings:
|
|
name, value = setting
|
|
command += f" --{name} \"{value}\""
|
|
description = f"""
|
|
echo -e \"{sql[:100]}...\" > {query.name}
|
|
{command}
|
|
"""
|
|
with step("executing command", description=description, format_description=False) if steps else NullStep():
|
|
try:
|
|
r = self.cluster.bash(None)(command, *args, **kwargs)
|
|
except ExpectTimeoutError:
|
|
self.cluster.close_bash(None)
|
|
else:
|
|
command = f"diff <(echo -e \"{sql}\" | {self.cluster.docker_compose} exec -T {self.name} {client}) {expected_output}"
|
|
for setting in query_settings:
|
|
name, value = setting
|
|
command += f" --{name} \"{value}\""
|
|
with step("executing command", description=command,
|
|
format_description=False) if steps else NullStep():
|
|
try:
|
|
r = self.cluster.bash(None)(command, *args, **kwargs)
|
|
except ExpectTimeoutError:
|
|
self.cluster.close_bash(None)
|
|
|
|
with Then(f"exitcode should be 0") if steps else NullStep():
|
|
assert r.exitcode == 0, error(r.output)
|
|
|
|
def query(self, sql, message=None, exitcode=None, steps=True, no_checks=False,
|
|
raise_on_exception=False, step=By, settings=None,
|
|
retry_count=5, messages_to_retry=None, retry_delay=5, secure=False,
|
|
*args, **kwargs):
|
|
"""Execute and check query.
|
|
:param sql: sql query
|
|
:param message: expected message that should be in the output, default: None
|
|
:param exitcode: expected exitcode, default: None
|
|
:param steps: wrap query execution in a step, default: True
|
|
:param no_check: disable exitcode and message checks, default: False
|
|
:param step: wrapping step class, default: By
|
|
:param settings: list of settings to be used for the query in the form [(name, value),...], default: None
|
|
:param retry_count: number of retries, default: 5
|
|
:param messages_to_retry: list of messages in the query output for
|
|
which retry should be triggered, default: MESSAGES_TO_RETRY
|
|
:param retry_delay: number of seconds to sleep before retry, default: 5
|
|
:param secure: use secure connection, default: False
|
|
"""
|
|
retry_count = max(0, int(retry_count))
|
|
retry_delay = max(0, float(retry_delay))
|
|
settings = list(settings or [])
|
|
query_settings = list(settings)
|
|
|
|
if messages_to_retry is None:
|
|
messages_to_retry = MESSAGES_TO_RETRY
|
|
|
|
if hasattr(current().context, "default_query_settings"):
|
|
query_settings += current().context.default_query_settings
|
|
|
|
client = "clickhouse client -n"
|
|
if secure:
|
|
client += " -s"
|
|
|
|
if len(sql) > 1024:
|
|
with tempfile.NamedTemporaryFile("w", encoding="utf-8") as query:
|
|
query.write(sql)
|
|
query.flush()
|
|
command = f"cat \"{query.name}\" | {self.cluster.docker_compose} exec -T {self.name} {client}"
|
|
for setting in query_settings:
|
|
name, value = setting
|
|
command += f" --{name} \"{value}\""
|
|
description = f"""
|
|
echo -e \"{sql[:100]}...\" > {query.name}
|
|
{command}
|
|
"""
|
|
with step("executing command", description=description, format_description=False) if steps else NullStep():
|
|
try:
|
|
r = self.cluster.bash(None)(command, *args, **kwargs)
|
|
except ExpectTimeoutError:
|
|
self.cluster.close_bash(None)
|
|
raise
|
|
else:
|
|
command = f"echo -e \"{sql}\" | {client}"
|
|
for setting in query_settings:
|
|
name, value = setting
|
|
command += f" --{name} \"{value}\""
|
|
with step("executing command", description=command, format_description=False) if steps else NullStep():
|
|
try:
|
|
r = self.cluster.bash(self.name)(command, *args, **kwargs)
|
|
except ExpectTimeoutError:
|
|
self.cluster.close_bash(self.name)
|
|
raise
|
|
|
|
if retry_count and retry_count > 0:
|
|
if any(msg in r.output for msg in messages_to_retry):
|
|
time.sleep(retry_delay)
|
|
return self.query(sql=sql, message=message, exitcode=exitcode,
|
|
steps=steps, no_checks=no_checks,
|
|
raise_on_exception=raise_on_exception, step=step, settings=settings,
|
|
retry_count=retry_count-1, messages_to_retry=messages_to_retry,
|
|
*args, **kwargs)
|
|
|
|
if no_checks:
|
|
return r
|
|
|
|
if exitcode is not None:
|
|
with Then(f"exitcode should be {exitcode}") if steps else NullStep():
|
|
assert r.exitcode == exitcode, error(r.output)
|
|
|
|
if message is not None:
|
|
with Then(f"output should contain message", description=message) if steps else NullStep():
|
|
assert message in r.output, error(r.output)
|
|
|
|
if message is None or "Exception:" not in message:
|
|
with Then("check if output has exception") if steps else NullStep():
|
|
if "Exception:" in r.output:
|
|
if raise_on_exception:
|
|
raise QueryRuntimeException(r.output)
|
|
assert False, error(r.output)
|
|
|
|
return r
|
|
|
|
|
|
class Cluster(object):
|
|
"""Simple object around docker-compose cluster.
|
|
"""
|
|
def __init__(self, local=False,
|
|
clickhouse_binary_path=None,
|
|
clickhouse_odbc_bridge_binary_path=None,
|
|
configs_dir=None,
|
|
nodes=None,
|
|
docker_compose="docker-compose", docker_compose_project_dir=None,
|
|
docker_compose_file="docker-compose.yml",
|
|
environ=None):
|
|
|
|
self._bash = {}
|
|
self._control_shell = None
|
|
self.environ = {} if (environ is None) else environ
|
|
self.clickhouse_binary_path = clickhouse_binary_path
|
|
self.clickhouse_odbc_bridge_binary_path = clickhouse_odbc_bridge_binary_path
|
|
self.configs_dir = configs_dir
|
|
self.local = local
|
|
self.nodes = nodes or {}
|
|
self.docker_compose = docker_compose
|
|
|
|
frame = inspect.currentframe().f_back
|
|
caller_dir = os.path.dirname(os.path.abspath(frame.f_globals["__file__"]))
|
|
|
|
# auto set configs directory
|
|
if self.configs_dir is None:
|
|
caller_configs_dir = caller_dir
|
|
if os.path.exists(caller_configs_dir):
|
|
self.configs_dir = caller_configs_dir
|
|
|
|
if not os.path.exists(self.configs_dir):
|
|
raise TypeError(f"configs directory '{self.configs_dir}' does not exist")
|
|
|
|
if docker_compose_project_dir is None:
|
|
raise TypeError("docker compose directory must be specified.")
|
|
|
|
docker_compose_file_path = os.path.join(docker_compose_project_dir or "", docker_compose_file)
|
|
|
|
if not os.path.exists(docker_compose_file_path):
|
|
raise TypeError(f"docker compose file '{docker_compose_file_path}' does not exist")
|
|
|
|
if self.clickhouse_binary_path and self.clickhouse_binary_path.startswith("docker://"):
|
|
if current().context.clickhouse_version is None:
|
|
try:
|
|
current().context.clickhouse_version = self.clickhouse_binary_path.split(":")[2]
|
|
debug(f"auto setting clickhouse version to {current().context.clickhouse_version}")
|
|
except IndexError:
|
|
current().context.clickhouse_version = None
|
|
self.clickhouse_binary_path, self.clickhouse_odbc_bridge_binary_path = self.get_clickhouse_binary_from_docker_container(
|
|
self.clickhouse_binary_path)
|
|
|
|
self.docker_compose += f" --ansi never --project-directory \"{docker_compose_project_dir}\" --file \"{docker_compose_file_path}\""
|
|
self.lock = threading.Lock()
|
|
|
|
def get_clickhouse_binary_from_docker_container(self, docker_image,
|
|
container_clickhouse_binary_path="/usr/bin/clickhouse",
|
|
container_clickhouse_odbc_bridge_binary_path="/usr/bin/clickhouse-odbc-bridge",
|
|
host_clickhouse_binary_path=None,
|
|
host_clickhouse_odbc_bridge_binary_path=None):
|
|
"""Get clickhouse-server and clickhouse-odbc-bridge binaries
|
|
from some Docker container.
|
|
"""
|
|
docker_image = docker_image.split("docker://", 1)[-1]
|
|
docker_container_name = str(uuid.uuid1())
|
|
|
|
if host_clickhouse_binary_path is None:
|
|
host_clickhouse_binary_path = os.path.join(tempfile.gettempdir(), f"{docker_image.rsplit('/',1)[-1].replace(':','_')}")
|
|
|
|
if host_clickhouse_odbc_bridge_binary_path is None:
|
|
host_clickhouse_odbc_bridge_binary_path = host_clickhouse_binary_path + "_odbc_bridge"
|
|
|
|
with Given("I get ClickHouse server binary from docker container", description=f"{docker_image}"):
|
|
with Shell() as bash:
|
|
bash.timeout = 300
|
|
bash(f"docker run -d --name \"{docker_container_name}\" {docker_image} | tee")
|
|
bash(f"docker cp \"{docker_container_name}:{container_clickhouse_binary_path}\" \"{host_clickhouse_binary_path}\"")
|
|
bash(f"docker cp \"{docker_container_name}:{container_clickhouse_odbc_bridge_binary_path}\" \"{host_clickhouse_odbc_bridge_binary_path}\"")
|
|
bash(f"docker stop \"{docker_container_name}\"")
|
|
|
|
return host_clickhouse_binary_path, host_clickhouse_odbc_bridge_binary_path
|
|
|
|
@property
|
|
def control_shell(self, timeout=300):
|
|
"""Must be called with self.lock.acquired.
|
|
"""
|
|
if self._control_shell is not None:
|
|
return self._control_shell
|
|
|
|
time_start = time.time()
|
|
while True:
|
|
try:
|
|
shell = Shell()
|
|
shell.timeout = 30
|
|
shell("echo 1")
|
|
break
|
|
except IOError:
|
|
raise
|
|
except Exception as exc:
|
|
shell.__exit__(None, None, None)
|
|
if time.time() - time_start > timeout:
|
|
raise RuntimeError(f"failed to open control shell")
|
|
self._control_shell = shell
|
|
return self._control_shell
|
|
|
|
def close_control_shell(self):
|
|
"""Must be called with self.lock.acquired.
|
|
"""
|
|
if self._control_shell is None:
|
|
return
|
|
shell = self._control_shell
|
|
self._control_shell = None
|
|
shell.__exit__(None, None, None)
|
|
|
|
def node_container_id(self, node, timeout=300):
|
|
"""Must be called with self.lock acquired.
|
|
"""
|
|
container_id = None
|
|
time_start = time.time()
|
|
while True:
|
|
try:
|
|
c = self.control_shell(f"{self.docker_compose} ps -q {node}", timeout=timeout)
|
|
container_id = c.output.strip()
|
|
if c.exitcode == 0 and len(container_id) > 1:
|
|
break
|
|
except IOError:
|
|
raise
|
|
except ExpectTimeoutError:
|
|
self.close_control_shell()
|
|
timeout = timeout - (time.time() - time_start)
|
|
if timeout <= 0:
|
|
raise RuntimeError(f"failed to get docker container id for the {node} service")
|
|
return container_id
|
|
|
|
def shell(self, node, timeout=300):
|
|
"""Returns unique shell terminal to be used.
|
|
"""
|
|
container_id = None
|
|
|
|
if node is not None:
|
|
with self.lock:
|
|
container_id = self.node_container_id(node=node, timeout=timeout)
|
|
|
|
time_start = time.time()
|
|
while True:
|
|
try:
|
|
if node is None:
|
|
shell = Shell()
|
|
else:
|
|
shell = Shell(command=[
|
|
"/bin/bash", "--noediting", "-c", f"docker exec -it {container_id} bash --noediting"
|
|
], name=node)
|
|
shell.timeout = 30
|
|
shell("echo 1")
|
|
break
|
|
except IOError:
|
|
raise
|
|
except Exception as exc:
|
|
shell.__exit__(None, None, None)
|
|
if time.time() - time_start > timeout:
|
|
raise RuntimeError(f"failed to open bash to node {node}")
|
|
|
|
shell.timeout = timeout
|
|
return shell
|
|
|
|
def bash(self, node, timeout=300, command="bash --noediting"):
|
|
"""Returns thread-local bash terminal
|
|
to a specific node.
|
|
:param node: name of the service
|
|
"""
|
|
test = current()
|
|
|
|
current_thread = threading.current_thread()
|
|
id = f"{current_thread.name}-{node}"
|
|
|
|
with self.lock:
|
|
if self._bash.get(id) is None:
|
|
if node is not None:
|
|
container_id = self.node_container_id(node=node, timeout=timeout)
|
|
|
|
time_start = time.time()
|
|
while True:
|
|
try:
|
|
if node is None:
|
|
self._bash[id] = Shell()
|
|
else:
|
|
self._bash[id] = Shell(command=[
|
|
"/bin/bash", "--noediting", "-c", f"docker exec -it {container_id} {command}"
|
|
], name=node).__enter__()
|
|
self._bash[id].timeout = 30
|
|
self._bash[id]("echo 1")
|
|
break
|
|
except IOError:
|
|
raise
|
|
except Exception as exc:
|
|
self._bash[id].__exit__(None, None, None)
|
|
if time.time() - time_start > timeout:
|
|
raise RuntimeError(f"failed to open bash to node {node}")
|
|
|
|
if node is None:
|
|
for name,value in self.environ.items():
|
|
self._bash[id](f"export {name}={value}")
|
|
|
|
self._bash[id].timeout = timeout
|
|
|
|
# clean up any stale open shells for threads that have exited
|
|
active_thread_names = {thread.name for thread in threading.enumerate()}
|
|
|
|
for bash_id in list(self._bash.keys()):
|
|
thread_name, node_name = bash_id.rsplit("-", 1)
|
|
if thread_name not in active_thread_names:
|
|
self._bash[bash_id].__exit__(None, None, None)
|
|
del self._bash[bash_id]
|
|
|
|
return self._bash[id]
|
|
|
|
def close_bash(self, node):
|
|
current_thread = threading.current_thread()
|
|
id = f"{current_thread.name}-{node}"
|
|
|
|
with self.lock:
|
|
if self._bash.get(id) is None:
|
|
return
|
|
self._bash[id].__exit__(None, None, None)
|
|
del self._bash[id]
|
|
|
|
def __enter__(self):
|
|
with Given("docker-compose cluster"):
|
|
self.up()
|
|
return self
|
|
|
|
def __exit__(self, type, value, traceback):
|
|
try:
|
|
with Finally("I clean up"):
|
|
self.down()
|
|
finally:
|
|
with self.lock:
|
|
for shell in self._bash.values():
|
|
shell.__exit__(type, value, traceback)
|
|
|
|
def node(self, name):
|
|
"""Get object with node bound methods.
|
|
:param name: name of service name
|
|
"""
|
|
if name.startswith("clickhouse"):
|
|
return ClickHouseNode(self, name)
|
|
return Node(self, name)
|
|
|
|
def down(self, timeout=300):
|
|
"""Bring cluster down by executing docker-compose down."""
|
|
|
|
# add message to each clickhouse-server.log
|
|
if settings.debug:
|
|
for node in self.nodes["clickhouse"]:
|
|
self.command(node=node, command=f"echo -e \"\n-- sending stop to: {node} --\n\" >> /var/log/clickhouse-server/clickhouse-server.log")
|
|
try:
|
|
bash = self.bash(None)
|
|
with self.lock:
|
|
# remove and close all not None node terminals
|
|
for id in list(self._bash.keys()):
|
|
shell = self._bash.pop(id)
|
|
if shell is not bash:
|
|
shell.__exit__(None, None, None)
|
|
else:
|
|
self._bash[id] = shell
|
|
finally:
|
|
cmd = self.command(None, f"{self.docker_compose} down -v --remove-orphans --timeout 60", bash=bash, timeout=timeout)
|
|
with self.lock:
|
|
if self._control_shell:
|
|
self._control_shell.__exit__(None, None, None)
|
|
self._control_shell = None
|
|
return cmd
|
|
|
|
def temp_path(self):
|
|
"""Return temporary folder path.
|
|
"""
|
|
p = f"{self.environ['CLICKHOUSE_TESTS_DIR']}/_temp"
|
|
if not os.path.exists(p):
|
|
os.mkdir(p)
|
|
return p
|
|
|
|
def temp_file(self, name):
|
|
"""Return absolute temporary file path.
|
|
"""
|
|
return f"{os.path.join(self.temp_path(), name)}"
|
|
|
|
def up(self, timeout=30*60):
|
|
if self.local:
|
|
with Given("I am running in local mode"):
|
|
with Then("check --clickhouse-binary-path is specified"):
|
|
assert self.clickhouse_binary_path, "when running in local mode then --clickhouse-binary-path must be specified"
|
|
with And("path should exist"):
|
|
assert os.path.exists(self.clickhouse_binary_path)
|
|
|
|
with And("I set all the necessary environment variables"):
|
|
self.environ["COMPOSE_HTTP_TIMEOUT"] = "300"
|
|
self.environ["CLICKHOUSE_TESTS_SERVER_BIN_PATH"] = self.clickhouse_binary_path
|
|
self.environ["CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH"] = self.clickhouse_odbc_bridge_binary_path or os.path.join(
|
|
os.path.dirname(self.clickhouse_binary_path), "clickhouse-odbc-bridge")
|
|
self.environ["CLICKHOUSE_TESTS_DIR"] = self.configs_dir
|
|
|
|
with And("I list environment variables to show their values"):
|
|
self.command(None, "env | grep CLICKHOUSE")
|
|
|
|
with Given("docker-compose"):
|
|
max_attempts = 5
|
|
max_up_attempts = 1
|
|
|
|
for attempt in range(max_attempts):
|
|
with When(f"attempt {attempt}/{max_attempts}"):
|
|
with By("pulling images for all the services"):
|
|
cmd = self.command(None, f"{self.docker_compose} pull 2>&1 | tee", exitcode=None, timeout=timeout)
|
|
if cmd.exitcode != 0:
|
|
continue
|
|
|
|
with And("checking if any containers are already running"):
|
|
self.command(None, f"{self.docker_compose} ps | tee")
|
|
|
|
with And("executing docker-compose down just in case it is up"):
|
|
cmd = self.command(None, f"{self.docker_compose} down 2>&1 | tee", exitcode=None, timeout=timeout)
|
|
if cmd.exitcode != 0:
|
|
continue
|
|
|
|
with And("checking if any containers are still left running"):
|
|
self.command(None, f"{self.docker_compose} ps | tee")
|
|
|
|
with And("executing docker-compose up"):
|
|
for up_attempt in range(max_up_attempts):
|
|
with By(f"attempt {up_attempt}/{max_up_attempts}"):
|
|
cmd = self.command(None, f"{self.docker_compose} up --renew-anon-volumes --force-recreate --timeout 300 -d 2>&1 | tee", timeout=timeout)
|
|
if "is unhealthy" not in cmd.output:
|
|
break
|
|
|
|
with Then("check there are no unhealthy containers"):
|
|
ps_cmd = self.command(None, f"{self.docker_compose} ps | tee | grep -v \"Exit 0\"")
|
|
if "is unhealthy" in cmd.output or "Exit" in ps_cmd.output:
|
|
self.command(None, f"{self.docker_compose} logs | tee")
|
|
continue
|
|
|
|
if cmd.exitcode == 0 and "is unhealthy" not in cmd.output and "Exit" not in ps_cmd.output:
|
|
break
|
|
|
|
if cmd.exitcode != 0 or "is unhealthy" in cmd.output or "Exit" in ps_cmd.output:
|
|
fail("could not bring up docker-compose cluster")
|
|
|
|
with Then("wait all nodes report healthy"):
|
|
for name in self.nodes["clickhouse"]:
|
|
self.node(name).wait_healthy()
|
|
if name.startswith("clickhouse"):
|
|
self.node(name).start_clickhouse()
|
|
|
|
def command(self, node, command, message=None, exitcode=None, steps=True,
|
|
bash=None, no_checks=False, use_error=True, *args, **kwargs):
|
|
"""Execute and check command.
|
|
:param node: name of the service
|
|
:param command: command
|
|
:param message: expected message that should be in the output, default: None
|
|
:param exitcode: expected exitcode, default: None
|
|
:param steps: don't break command into steps, default: True
|
|
"""
|
|
with By("executing command", description=command, format_description=False) if steps else NullStep():
|
|
if bash is None:
|
|
bash = self.bash(node)
|
|
try:
|
|
r = bash(command, *args, **kwargs)
|
|
except ExpectTimeoutError:
|
|
self.close_bash(node)
|
|
raise
|
|
|
|
if no_checks:
|
|
return r
|
|
|
|
if exitcode is not None:
|
|
with Then(f"exitcode should be {exitcode}", format_name=False) if steps else NullStep():
|
|
assert r.exitcode == exitcode, error(r.output)
|
|
|
|
if message is not None:
|
|
with Then(f"output should contain message", description=message, format_description=False) if steps else NullStep():
|
|
assert message in r.output, error(r.output)
|
|
|
|
return r
|