mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-18 04:12:19 +00:00
569 lines
19 KiB
Python
569 lines
19 KiB
Python
import os
|
|
import uuid
|
|
import time
|
|
import xml.etree.ElementTree as xmltree
|
|
import packaging.version as pkg_version
|
|
from collections import namedtuple
|
|
|
|
import testflows.settings as settings
|
|
from testflows.core import *
|
|
from testflows.asserts import error
|
|
from testflows.core.name import basename, parentname
|
|
from testflows._core.testtype import TestSubType
|
|
|
|
|
|
def check_clickhouse_version(version):
|
|
"""Compare ClickHouse version."""
|
|
|
|
def check(test):
|
|
if getattr(test.context, "clickhouse_version", None) is None:
|
|
return False
|
|
|
|
clickhouse_version = pkg_version.parse(str(test.context.clickhouse_version))
|
|
|
|
if version.startswith("=="):
|
|
return clickhouse_version == pkg_version.parse(
|
|
str(version.split("==", 1)[-1])
|
|
)
|
|
elif version.startswith(">="):
|
|
return clickhouse_version >= pkg_version.parse(
|
|
str(version.split(">=", 1)[-1])
|
|
)
|
|
elif version.startswith("<="):
|
|
return clickhouse_version <= pkg_version.parse(
|
|
str(version.split("<=", 1)[-1])
|
|
)
|
|
elif version.startswith("="):
|
|
return clickhouse_version == pkg_version.parse(
|
|
str(version.split("=", 1)[-1])
|
|
)
|
|
elif version.startswith(">"):
|
|
return clickhouse_version > pkg_version.parse(
|
|
str(version.split(">", 1)[-1])
|
|
)
|
|
elif version.startswith("<"):
|
|
return clickhouse_version < pkg_version.parse(
|
|
str(version.split("<", 1)[-1])
|
|
)
|
|
else:
|
|
return clickhouse_version == pkg_version.parse(str(version))
|
|
|
|
return check
|
|
|
|
|
|
def getuid(with_test_name=False):
|
|
if not with_test_name:
|
|
return str(uuid.uuid1()).replace("-", "_")
|
|
|
|
if current().subtype == TestSubType.Example:
|
|
testname = (
|
|
f"{basename(parentname(current().name)).replace(' ', '_').replace(',', '')}"
|
|
)
|
|
else:
|
|
testname = f"{basename(current().name).replace(' ', '_').replace(',', '')}"
|
|
|
|
return testname + "_" + str(uuid.uuid1()).replace("-", "_")
|
|
|
|
|
|
@TestStep(Given)
|
|
def instrument_clickhouse_server_log(
|
|
self,
|
|
node=None,
|
|
test=None,
|
|
clickhouse_server_log="/var/log/clickhouse-server/clickhouse-server.log",
|
|
always_dump=False,
|
|
):
|
|
"""Instrument clickhouse-server.log for the current test (default)
|
|
by adding start and end messages that include test name to log
|
|
of the specified node. If we are in the debug mode and the test
|
|
fails then dump the messages from the log for this test.
|
|
|
|
:param always_dump: always dump clickhouse log after test, default: `False`
|
|
"""
|
|
if test is None:
|
|
test = current()
|
|
|
|
if node is None:
|
|
node = self.context.node
|
|
|
|
with By("getting current log size"):
|
|
cmd = node.command(f"stat --format=%s {clickhouse_server_log}")
|
|
if (
|
|
cmd.output
|
|
== f"stat: cannot stat '{clickhouse_server_log}': No such file or directory"
|
|
):
|
|
start_logsize = 0
|
|
else:
|
|
start_logsize = cmd.output.split(" ")[0].strip()
|
|
|
|
try:
|
|
with And("adding test name start message to the clickhouse-server.log"):
|
|
node.command(
|
|
f'echo -e "\\n-- start: {test.name} --\\n" >> {clickhouse_server_log}'
|
|
)
|
|
yield
|
|
|
|
finally:
|
|
if test.terminating is True:
|
|
return
|
|
|
|
with Finally(
|
|
"adding test name end message to the clickhouse-server.log", flags=TE
|
|
):
|
|
node.command(
|
|
f'echo -e "\\n-- end: {test.name} --\\n" >> {clickhouse_server_log}'
|
|
)
|
|
|
|
with And("getting current log size at the end of the test"):
|
|
cmd = node.command(f"stat --format=%s {clickhouse_server_log}")
|
|
end_logsize = cmd.output.split(" ")[0].strip()
|
|
|
|
dump_log = always_dump or (settings.debug and not self.parent.result)
|
|
|
|
if dump_log:
|
|
with Then("dumping clickhouse-server.log for this test"):
|
|
node.command(
|
|
f"tail -c +{start_logsize} {clickhouse_server_log}"
|
|
f" | head -c {int(end_logsize) - int(start_logsize)}"
|
|
)
|
|
|
|
|
|
xml_with_utf8 = '<?xml version="1.0" encoding="utf-8"?>\n'
|
|
|
|
|
|
def xml_indent(elem, level=0, by=" "):
|
|
i = "\n" + level * by
|
|
if len(elem):
|
|
if not elem.text or not elem.text.strip():
|
|
elem.text = i + by
|
|
if not elem.tail or not elem.tail.strip():
|
|
elem.tail = i
|
|
for elem in elem:
|
|
xml_indent(elem, level + 1)
|
|
if not elem.tail or not elem.tail.strip():
|
|
elem.tail = i
|
|
else:
|
|
if level and (not elem.tail or not elem.tail.strip()):
|
|
elem.tail = i
|
|
|
|
|
|
def xml_append(root, tag, text):
|
|
element = xmltree.Element(tag)
|
|
element.text = text
|
|
root.append(element)
|
|
return element
|
|
|
|
|
|
class Config:
|
|
def __init__(self, content, path, name, uid, preprocessed_name):
|
|
self.content = content
|
|
self.path = path
|
|
self.name = name
|
|
self.uid = uid
|
|
self.preprocessed_name = preprocessed_name
|
|
|
|
|
|
class KeyWithAttributes:
|
|
def __init__(self, name, attributes):
|
|
"""XML key with attributes.
|
|
|
|
:param name: key name
|
|
:param attributes: dictionary of attributes {name: value, ...}
|
|
"""
|
|
self.name = name
|
|
self.attributes = dict(attributes)
|
|
|
|
|
|
def create_xml_config_content(
|
|
entries, config_file, config_d_dir="/etc/clickhouse-server/config.d"
|
|
):
|
|
"""Create XML configuration file from a dictionary.
|
|
|
|
:param entries: dictionary that defines xml
|
|
:param config_file: name of the config file
|
|
:param config_d_dir: config.d directory path, default: `/etc/clickhouse-server/config.d`
|
|
"""
|
|
uid = getuid()
|
|
path = os.path.join(config_d_dir, config_file)
|
|
name = config_file
|
|
root = xmltree.Element("clickhouse")
|
|
root.append(xmltree.Comment(text=f"config uid: {uid}"))
|
|
|
|
def create_xml_tree(entries, root):
|
|
for k, v in entries.items():
|
|
if isinstance(k, KeyWithAttributes):
|
|
xml_element = xmltree.Element(k.name)
|
|
for attr_name, attr_value in k.attributes.items():
|
|
xml_element.set(attr_name, attr_value)
|
|
if type(v) is dict:
|
|
create_xml_tree(v, xml_element)
|
|
elif type(v) in (list, tuple):
|
|
for e in v:
|
|
create_xml_tree(e, xml_element)
|
|
else:
|
|
xml_element.text = v
|
|
root.append(xml_element)
|
|
elif type(v) is dict:
|
|
xml_element = xmltree.Element(k)
|
|
create_xml_tree(v, xml_element)
|
|
root.append(xml_element)
|
|
elif type(v) in (list, tuple):
|
|
xml_element = xmltree.Element(k)
|
|
for e in v:
|
|
create_xml_tree(e, xml_element)
|
|
root.append(xml_element)
|
|
else:
|
|
xml_append(root, k, v)
|
|
|
|
create_xml_tree(entries, root)
|
|
xml_indent(root)
|
|
content = xml_with_utf8 + str(
|
|
xmltree.tostring(root, short_empty_elements=False, encoding="utf-8"), "utf-8"
|
|
)
|
|
|
|
return Config(content, path, name, uid, "config.xml")
|
|
|
|
|
|
def add_invalid_config(
|
|
config, message, recover_config=None, tail=30, timeout=300, restart=True, user=None
|
|
):
|
|
"""Check that ClickHouse errors when trying to load invalid configuration file."""
|
|
cluster = current().context.cluster
|
|
node = current().context.node
|
|
|
|
try:
|
|
with Given("I prepare the error log by writing empty lines into it"):
|
|
node.command(
|
|
'echo -e "%s" > /var/log/clickhouse-server/clickhouse-server.err.log'
|
|
% ("-\\n" * tail)
|
|
)
|
|
|
|
with When("I add the config", description=config.path):
|
|
command = f"cat <<HEREDOC > {config.path}\n{config.content}\nHEREDOC"
|
|
node.command(command, steps=False, exitcode=0)
|
|
|
|
with Then(
|
|
f"{config.preprocessed_name} should be updated",
|
|
description=f"timeout {timeout}",
|
|
):
|
|
started = time.time()
|
|
command = f"cat /var/lib/clickhouse/preprocessed_configs/{config.preprocessed_name} | grep {config.uid}{' > /dev/null' if not settings.debug else ''}"
|
|
while time.time() - started < timeout:
|
|
exitcode = node.command(command, steps=False).exitcode
|
|
if exitcode == 0:
|
|
break
|
|
time.sleep(1)
|
|
assert exitcode == 0, error()
|
|
|
|
if restart:
|
|
with When("I restart ClickHouse to apply the config changes"):
|
|
node.restart_clickhouse(safe=False, wait_healthy=False, user=user)
|
|
|
|
finally:
|
|
if recover_config is None:
|
|
with Finally(f"I remove {config.name}"):
|
|
with By("removing invalid configuration file"):
|
|
system_config_path = os.path.join(
|
|
cluster.environ["CLICKHOUSE_TESTS_DIR"],
|
|
"configs",
|
|
node.name,
|
|
"config.d",
|
|
config.path.split("config.d/")[-1],
|
|
)
|
|
cluster.command(
|
|
None,
|
|
f"rm -rf {system_config_path}",
|
|
timeout=timeout,
|
|
exitcode=0,
|
|
)
|
|
|
|
if restart:
|
|
with And("restarting ClickHouse"):
|
|
node.restart_clickhouse(safe=False, user=user)
|
|
node.restart_clickhouse(safe=False, user=user)
|
|
else:
|
|
with Finally(f"I change {config.name}"):
|
|
with By("changing invalid configuration file"):
|
|
system_config_path = os.path.join(
|
|
cluster.environ["CLICKHOUSE_TESTS_DIR"],
|
|
"configs",
|
|
node.name,
|
|
"config.d",
|
|
config.path.split("config.d/")[-1],
|
|
)
|
|
cluster.command(
|
|
None,
|
|
f"rm -rf {system_config_path}",
|
|
timeout=timeout,
|
|
exitcode=0,
|
|
)
|
|
command = f"cat <<HEREDOC > {system_config_path}\n{recover_config.content}\nHEREDOC"
|
|
cluster.command(None, command, timeout=timeout, exitcode=0)
|
|
|
|
if restart:
|
|
with And("restarting ClickHouse"):
|
|
node.restart_clickhouse(safe=False, user=user)
|
|
|
|
with Then("error log should contain the expected error message"):
|
|
started = time.time()
|
|
command = f'tail -n {tail} /var/log/clickhouse-server/clickhouse-server.err.log | grep "{message}"'
|
|
while time.time() - started < timeout:
|
|
exitcode = node.command(command, steps=False).exitcode
|
|
if exitcode == 0:
|
|
break
|
|
time.sleep(1)
|
|
assert exitcode == 0, error()
|
|
|
|
|
|
def add_config(
|
|
config,
|
|
timeout=300,
|
|
restart=False,
|
|
modify=False,
|
|
node=None,
|
|
user=None,
|
|
wait_healthy=True,
|
|
check_preprocessed=True,
|
|
):
|
|
"""Add dynamic configuration file to ClickHouse.
|
|
|
|
:param config: configuration file description
|
|
:param timeout: timeout, default: 300 sec
|
|
:param restart: restart server, default: False
|
|
:param modify: only modify configuration file, default: False
|
|
"""
|
|
if node is None:
|
|
node = current().context.node
|
|
cluster = current().context.cluster
|
|
|
|
def check_preprocessed_config_is_updated(after_removal=False):
|
|
"""Check that preprocessed config is updated."""
|
|
started = time.time()
|
|
command = f"cat /var/lib/clickhouse/preprocessed_configs/{config.preprocessed_name} | grep {config.uid}{' > /dev/null' if not settings.debug else ''}"
|
|
|
|
while time.time() - started < timeout:
|
|
exitcode = node.command(command, steps=False).exitcode
|
|
if after_removal:
|
|
if exitcode == 1:
|
|
break
|
|
else:
|
|
if exitcode == 0:
|
|
break
|
|
time.sleep(1)
|
|
|
|
if settings.debug:
|
|
node.command(
|
|
f"cat /var/lib/clickhouse/preprocessed_configs/{config.preprocessed_name}"
|
|
)
|
|
|
|
if after_removal:
|
|
assert exitcode == 1, error()
|
|
else:
|
|
assert exitcode == 0, error()
|
|
|
|
def wait_for_config_to_be_loaded(user=None):
|
|
"""Wait for config to be loaded."""
|
|
if restart:
|
|
with When("I close terminal to the node to be restarted"):
|
|
bash.close()
|
|
|
|
with And("I stop ClickHouse to apply the config changes"):
|
|
node.stop_clickhouse(safe=False)
|
|
|
|
with And("I get the current log size"):
|
|
cmd = node.cluster.command(
|
|
None,
|
|
f"stat --format=%s {cluster.environ['CLICKHOUSE_TESTS_DIR']}/_instances/{node.name}/logs/clickhouse-server.log",
|
|
)
|
|
logsize = cmd.output.split(" ")[0].strip()
|
|
|
|
with And("I start ClickHouse back up"):
|
|
node.start_clickhouse(user=user, wait_healthy=wait_healthy)
|
|
|
|
with Then("I tail the log file from using previous log size as the offset"):
|
|
bash.prompt = bash.__class__.prompt
|
|
bash.open()
|
|
bash.send(
|
|
f"tail -c +{logsize} -f /var/log/clickhouse-server/clickhouse-server.log"
|
|
)
|
|
|
|
with Then("I wait for config reload message in the log file"):
|
|
if restart:
|
|
bash.expect(
|
|
f"ConfigReloader: Loaded config '/etc/clickhouse-server/config.xml', performed update on configuration",
|
|
timeout=timeout,
|
|
)
|
|
else:
|
|
bash.expect(
|
|
f"ConfigReloader: Loaded config '/etc/clickhouse-server/{config.preprocessed_name}', performed update on configuration",
|
|
timeout=timeout,
|
|
)
|
|
|
|
try:
|
|
with Given(f"{config.name}"):
|
|
if settings.debug:
|
|
with When("I output the content of the config"):
|
|
debug(config.content)
|
|
|
|
with node.cluster.shell(node.name) as bash:
|
|
bash.expect(bash.prompt)
|
|
bash.send(
|
|
"tail -v -n 0 -f /var/log/clickhouse-server/clickhouse-server.log"
|
|
)
|
|
# make sure tail process is launched and started to follow the file
|
|
bash.expect("<==")
|
|
bash.expect("\n")
|
|
|
|
with When("I add the config", description=config.path):
|
|
command = (
|
|
f"cat <<HEREDOC > {config.path}\n{config.content}\nHEREDOC"
|
|
)
|
|
node.command(command, steps=False, exitcode=0)
|
|
|
|
if check_preprocessed:
|
|
with Then(
|
|
f"{config.preprocessed_name} should be updated",
|
|
description=f"timeout {timeout}",
|
|
):
|
|
check_preprocessed_config_is_updated()
|
|
|
|
with And("I wait for config to be reloaded"):
|
|
wait_for_config_to_be_loaded(user=user)
|
|
|
|
yield
|
|
finally:
|
|
if not modify:
|
|
with Finally(f"I remove {config.name} on {node.name}"):
|
|
with node.cluster.shell(node.name) as bash:
|
|
bash.expect(bash.prompt)
|
|
bash.send(
|
|
"tail -v -n 0 -f /var/log/clickhouse-server/clickhouse-server.log"
|
|
)
|
|
# make sure tail process is launched and started to follow the file
|
|
bash.expect("<==")
|
|
bash.expect("\n")
|
|
|
|
with By("removing the config file", description=config.path):
|
|
node.command(f"rm -rf {config.path}", exitcode=0)
|
|
|
|
with Then(
|
|
f"{config.preprocessed_name} should be updated",
|
|
description=f"timeout {timeout}",
|
|
):
|
|
check_preprocessed_config_is_updated(after_removal=True)
|
|
|
|
with And("I wait for config to be reloaded"):
|
|
wait_for_config_to_be_loaded()
|
|
|
|
|
|
@TestStep(When)
|
|
def copy(
|
|
self,
|
|
dest_node,
|
|
src_path,
|
|
dest_path,
|
|
bash=None,
|
|
binary=False,
|
|
eof="EOF",
|
|
src_node=None,
|
|
):
|
|
"""Copy file from source to destination node."""
|
|
if binary:
|
|
raise NotImplementedError("not yet implemented; need to use base64 encoding")
|
|
|
|
bash = self.context.cluster.bash(node=src_node)
|
|
|
|
cmd = bash(f"cat {src_path}")
|
|
|
|
assert cmd.exitcode == 0, error()
|
|
contents = cmd.output
|
|
|
|
dest_node.command(f"cat << {eof} > {dest_path}\n{contents}\n{eof}")
|
|
|
|
|
|
@TestStep(Given)
|
|
def add_user_to_group_on_node(
|
|
self, node=None, group="clickhouse", username="clickhouse"
|
|
):
|
|
"""Add user {username} into group {group}."""
|
|
if node is None:
|
|
node = self.context.node
|
|
|
|
node.command(f"usermode -g {group} {username}", exitcode=0)
|
|
|
|
|
|
@TestStep(Given)
|
|
def change_user_on_node(self, node=None, username="clickhouse"):
|
|
"""Change user on node."""
|
|
if node is None:
|
|
node = self.context.node
|
|
try:
|
|
node.command(f"su {username}", exitcode=0)
|
|
yield
|
|
finally:
|
|
node.command("exit", exitcode=0)
|
|
|
|
|
|
@TestStep(Given)
|
|
def add_user_on_node(self, node=None, groupname=None, username="clickhouse"):
|
|
"""Create user on node with group specifying."""
|
|
if node is None:
|
|
node = self.context.node
|
|
try:
|
|
if groupname is None:
|
|
node.command(f"useradd -s /bin/bash {username}", exitcode=0)
|
|
else:
|
|
node.command(f"useradd -g {groupname} -s /bin/bash {username}", exitcode=0)
|
|
yield
|
|
finally:
|
|
node.command(f"deluser {username}", exitcode=0)
|
|
|
|
|
|
@TestStep(Given)
|
|
def add_group_on_node(self, node=None, groupname="clickhouse"):
|
|
"""Create group on node"""
|
|
if node is None:
|
|
node = self.context.node
|
|
try:
|
|
node.command(f"groupadd {groupname}", exitcode=0)
|
|
yield
|
|
finally:
|
|
node.command(f"delgroup clickhouse")
|
|
|
|
|
|
@TestStep(Given)
|
|
def create_file_on_node(self, path, content, node=None):
|
|
"""Create file on node.
|
|
|
|
:param path: file path
|
|
:param content: file content
|
|
"""
|
|
if node is None:
|
|
node = self.context.node
|
|
try:
|
|
with By(f"creating file {path}"):
|
|
node.command(f"cat <<HEREDOC > {path}\n{content}\nHEREDOC", exitcode=0)
|
|
yield path
|
|
finally:
|
|
with Finally(f"I remove {path}"):
|
|
node.command(f"rm -rf {path}", exitcode=0)
|
|
|
|
|
|
@TestStep(Given)
|
|
def set_envs_on_node(self, envs, node=None):
|
|
"""Set environment variables on node.
|
|
|
|
:param envs: dictionary of env variables key=value
|
|
"""
|
|
if node is None:
|
|
node = self.context.node
|
|
try:
|
|
with By("setting envs"):
|
|
for key, value in envs.items():
|
|
node.command(f"export {key}={value}", exitcode=0)
|
|
yield
|
|
finally:
|
|
with Finally(f"I unset envs"):
|
|
for key in envs:
|
|
node.command(f"unset {key}", exitcode=0)
|