improvements

This commit is contained in:
Yatsishin Ilya 2022-07-07 20:19:15 +00:00
parent b0f8970d07
commit d274b05fac
14 changed files with 214 additions and 169 deletions

View File

@ -62,7 +62,7 @@ def pre_build(repo_path: str, env_variables: List[str]):
f"git -C {repo_path} fetch --no-recurse-submodules " f"git -C {repo_path} fetch --no-recurse-submodules "
"--no-tags origin master:master" "--no-tags origin master:master"
) )
logging.info("Getting master branch for performance artifact: ''%s'", cmd) logging.info("Getting master branch for performance artifact: '%s'", cmd)
subprocess.check_call(cmd, shell=True) subprocess.check_call(cmd, shell=True)

View File

@ -237,14 +237,22 @@ def enable_consistent_hash_plugin(rabbitmq_id):
return p.returncode == 0 return p.returncode == 0
def get_instances_dir(): def get_instances_dir(name):
if ( instances_dir_name = "_instances"
"INTEGRATION_TESTS_RUN_ID" in os.environ
and os.environ["INTEGRATION_TESTS_RUN_ID"] worker_name = os.environ.get("PYTEST_XDIST_WORKER", "")
): run_id = os.environ.get("INTEGRATION_TESTS_RUN_ID", "")
return "_instances_" + shlex.quote(os.environ["INTEGRATION_TESTS_RUN_ID"])
else: if worker_name:
return "_instances" instances_dir_name += "_" + worker_name
if name:
instances_dir_name += "_" + name
if run_id:
instances_dir_name += "_" + shlex.quote(run_id)
return instances_dir_name
class ClickHouseCluster: class ClickHouseCluster:
@ -270,6 +278,7 @@ class ClickHouseCluster:
zookeeper_keyfile=None, zookeeper_keyfile=None,
zookeeper_certfile=None, zookeeper_certfile=None,
): ):
logging.debug(f"INIT CALLED")
for param in list(os.environ.keys()): for param in list(os.environ.keys()):
logging.debug("ENV %40s %s" % (param, os.environ[param])) logging.debug("ENV %40s %s" % (param, os.environ[param]))
self.base_path = base_path self.base_path = base_path
@ -306,19 +315,8 @@ class ClickHouseCluster:
) )
# docker-compose removes everything non-alphanumeric from project names so we do it too. # docker-compose removes everything non-alphanumeric from project names so we do it too.
self.project_name = re.sub(r"[^a-z0-9]", "", project_name.lower()) self.project_name = re.sub(r"[^a-z0-9]", "", project_name.lower())
instances_dir_name = "_instances" self.instances_dir_name = get_instances_dir(self.name)
if self.name: self.instances_dir = p.join(self.base_dir, self.instances_dir_name)
instances_dir_name += "_" + self.name
if (
"INTEGRATION_TESTS_RUN_ID" in os.environ
and os.environ["INTEGRATION_TESTS_RUN_ID"]
):
instances_dir_name += "_" + shlex.quote(
os.environ["INTEGRATION_TESTS_RUN_ID"]
)
self.instances_dir = p.join(self.base_dir, instances_dir_name)
self.docker_logs_path = p.join(self.instances_dir, "docker.log") self.docker_logs_path = p.join(self.instances_dir, "docker.log")
self.env_file = p.join(self.instances_dir, DEFAULT_ENV_NAME) self.env_file = p.join(self.instances_dir, DEFAULT_ENV_NAME)
self.env_variables = {} self.env_variables = {}
@ -536,8 +534,37 @@ class ClickHouseCluster:
self.is_up = False self.is_up = False
self.env = os.environ.copy() self.env = os.environ.copy()
logging.debug(f"CLUSTER INIT base_config_dir:{self.base_config_dir}") logging.debug(f"CLUSTER INIT base_config_dir:{self.base_config_dir}")
if p.exists(self.instances_dir):
shutil.rmtree(self.instances_dir, ignore_errors=True)
logging.debug(f"Removed :{self.instances_dir}")
os.mkdir(self.instances_dir)
def print_all_docker_pieces(self):
res_networks = subprocess.check_output(
f"docker network ls --filter name='{self.project_name}*'",
shell=True,
universal_newlines=True,
)
logging.debug(f"Docker networks for project {self.project_name} are {res_networks}")
res_containers = subprocess.check_output(
f"docker container ls -a --filter name='{self.project_name}*'",
shell=True,
universal_newlines=True,
)
logging.debug(f"Docker containers for project {self.project_name} are {res_containers}")
res_volumes = subprocess.check_output(
f"docker volume ls --filter name='{self.project_name}*'",
shell=True,
universal_newlines=True,
)
logging.debug(f"Docker volumes for project {self.project_name} are {res_volumes}")
def cleanup(self): def cleanup(self):
logging.debug('Cleanup called')
self.print_all_docker_pieces()
if ( if (
os.environ os.environ
and "DISABLE_CLEANUP" in os.environ and "DISABLE_CLEANUP" in os.environ
@ -549,12 +576,16 @@ class ClickHouseCluster:
# Just in case kill unstopped containers from previous launch # Just in case kill unstopped containers from previous launch
try: try:
unstopped_containers = self.get_running_containers() unstopped_containers = self.get_running_containers()
if unstopped_containers: logging.debug(f"Unstopped containers: {unstopped_containers}")
if len(unstopped_containers):
logging.debug( logging.debug(
f"Trying to kill unstopped containers: {unstopped_containers}" f"Trying to kill unstopped containers: {unstopped_containers}"
) )
for id in unstopped_containers: for id in unstopped_containers:
run_and_check(f"docker kill {id}", shell=True, nothrow=True) try:
run_and_check(f"docker kill {id}", shell=True, nothrow=True)
except:
pass
run_and_check(f"docker rm {id}", shell=True, nothrow=True) run_and_check(f"docker rm {id}", shell=True, nothrow=True)
unstopped_containers = self.get_running_containers() unstopped_containers = self.get_running_containers()
if unstopped_containers: if unstopped_containers:
@ -563,26 +594,33 @@ class ClickHouseCluster:
logging.debug(f"Unstopped containers killed.") logging.debug(f"Unstopped containers killed.")
else: else:
logging.debug(f"No running containers for project: {self.project_name}") logging.debug(f"No running containers for project: {self.project_name}")
except Exception as ex:
logging.debug(f"Got exception removing containers {str(ex)}")
# # Just in case remove unused networks
try:
logging.debug("Trying to prune unused networks...")
list_networks = subprocess.check_output(
f"docker network ls -q --filter name='{self.project_name}'",
shell=True,
universal_newlines=True,
).splitlines()
if list_networks:
logging.debug(f"Trying to remove networks: {list_networks}")
subprocess.check_call(f"docker network rm {' '.join(list_networks)}", shell=True)
logging.debug(f"Networks removed: {list_networks}")
except: except:
pass pass
# # Just in case remove unused networks
# try:
# logging.debug("Trying to prune unused networks...")
# run_and_check(['docker', 'network', 'prune', '-f'])
# logging.debug("Networks pruned")
# except:
# pass
# Remove unused images # Remove unused images
# try: try:
# logging.debug("Trying to prune unused images...") logging.debug("Trying to prune unused images...")
# run_and_check(['docker', 'image', 'prune', '-f']) run_and_check(['docker', 'image', 'prune', '-f'])
# logging.debug("Images pruned") logging.debug("Images pruned")
# except: except:
# pass pass
# Remove unused volumes # Remove unused volumes
try: try:
@ -626,7 +664,7 @@ class ClickHouseCluster:
shell=True, shell=True,
) )
containers = dict( containers = dict(
line.split(":", 1) for line in containers.decode("utf8").splitlines() line.split(":", 1) for line in containers.splitlines()
) )
return containers return containers
@ -1767,7 +1805,7 @@ class ClickHouseCluster:
errors += [str(ex)] errors += [str(ex)]
time.sleep(0.5) time.sleep(0.5)
run_and_check(["docker-compose", "ps", "--services", "--all"]) run_and_check(["docker", "ps", "--all"])
logging.error("Can't connect to MySQL:{}".format(errors)) logging.error("Can't connect to MySQL:{}".format(errors))
raise Exception("Cannot wait MySQL container") raise Exception("Cannot wait MySQL container")
@ -1789,7 +1827,7 @@ class ClickHouseCluster:
logging.debug("Can't connect to MySQL 8 " + str(ex)) logging.debug("Can't connect to MySQL 8 " + str(ex))
time.sleep(0.5) time.sleep(0.5)
run_and_check(["docker-compose", "ps", "--services", "--all"]) run_and_check(["docker", "ps", "--all"])
raise Exception("Cannot wait MySQL 8 container") raise Exception("Cannot wait MySQL 8 container")
def wait_mysql_cluster_to_start(self, timeout=180): def wait_mysql_cluster_to_start(self, timeout=180):
@ -1814,7 +1852,7 @@ class ClickHouseCluster:
errors += [str(ex)] errors += [str(ex)]
time.sleep(0.5) time.sleep(0.5)
run_and_check(["docker-compose", "ps", "--services", "--all"]) run_and_check(["docker", "ps", "--all"])
logging.error("Can't connect to MySQL:{}".format(errors)) logging.error("Can't connect to MySQL:{}".format(errors))
raise Exception("Cannot wait MySQL container") raise Exception("Cannot wait MySQL container")
@ -2087,7 +2125,7 @@ class ClickHouseCluster:
logging.debug("Can't connect to MeiliSearch " + str(ex)) logging.debug("Can't connect to MeiliSearch " + str(ex))
time.sleep(1) time.sleep(1)
def wait_minio_to_start(self, timeout=180, secure=False): def wait_minio_to_start(self, timeout=10, secure=False):
self.minio_ip = self.get_instance_ip(self.minio_host) self.minio_ip = self.get_instance_ip(self.minio_host)
self.minio_redirect_ip = self.get_instance_ip(self.minio_redirect_host) self.minio_redirect_ip = self.get_instance_ip(self.minio_redirect_host)
@ -2129,6 +2167,14 @@ class ClickHouseCluster:
logging.debug("Can't connect to Minio: %s", str(ex)) logging.debug("Can't connect to Minio: %s", str(ex))
time.sleep(1) time.sleep(1)
try:
with open(os.path.join(self.minio_dir, "docker.log"), "w+") as f:
subprocess.check_call( # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL
self.base_minio_cmd + ["logs"], stdout=f
)
except Exception as e:
logging.debug("Unable to get logs from docker.")
raise Exception("Can't wait Minio to start") raise Exception("Can't wait Minio to start")
def wait_azurite_to_start(self, timeout=180): def wait_azurite_to_start(self, timeout=180):
@ -2199,15 +2245,13 @@ class ClickHouseCluster:
raise Exception("Can't wait Cassandra to start") raise Exception("Can't wait Cassandra to start")
def start(self, destroy_dirs=True): def start(self):
pytest_xdist_logging_to_separate_files.setup() pytest_xdist_logging_to_separate_files.setup()
logging.info("Running tests in {}".format(self.base_path)) logging.info("Running tests in {}".format(self.base_path))
logging.debug( logging.debug(f"Cluster start called. is_up={self.is_up}")
"Cluster start called. is_up={}, destroy_dirs={}".format( self.print_all_docker_pieces()
self.is_up, destroy_dirs
)
)
if self.is_up: if self.is_up:
return return
@ -2217,23 +2261,9 @@ class ClickHouseCluster:
logging.warning("Cleanup failed:{e}") logging.warning("Cleanup failed:{e}")
try: try:
# clickhouse_pull_cmd = self.base_cmd + ['pull']
# print(f"Pulling images for {self.base_cmd}")
# retry_exception(10, 5, subprocess_check_call, Exception, clickhouse_pull_cmd)
if destroy_dirs and p.exists(self.instances_dir):
logging.debug(f"Removing instances dir {self.instances_dir}")
shutil.rmtree(self.instances_dir)
for instance in list(self.instances.values()): for instance in list(self.instances.values()):
logging.debug( logging.debug(f"Setup directory for instance: {instance.name}")
( instance.create_dir()
"Setup directory for instance: {} destroy_dirs: {}".format(
instance.name, destroy_dirs
)
)
)
instance.create_dir(destroy_dir=destroy_dirs)
_create_env_file(os.path.join(self.env_file), self.env_variables) _create_env_file(os.path.join(self.env_file), self.env_variables)
self.docker_client = docker.DockerClient( self.docker_client = docker.DockerClient(
@ -2627,13 +2657,9 @@ class ClickHouseCluster:
def pause_container(self, instance_name): def pause_container(self, instance_name):
subprocess_check_call(self.base_cmd + ["pause", instance_name]) subprocess_check_call(self.base_cmd + ["pause", instance_name])
# subprocess_check_call(self.base_cmd + ['kill', '-s SIGSTOP', instance_name])
def unpause_container(self, instance_name): def unpause_container(self, instance_name):
subprocess_check_call(self.base_cmd + ["unpause", instance_name]) subprocess_check_call(self.base_cmd + ["unpause", instance_name])
# subprocess_check_call(self.base_cmd + ['kill', '-s SIGCONT', instance_name])
def open_bash_shell(self, instance_name): def open_bash_shell(self, instance_name):
os.system(" ".join(self.base_cmd + ["exec", instance_name, "/bin/bash"])) os.system(" ".join(self.base_cmd + ["exec", instance_name, "/bin/bash"]))
@ -3687,14 +3713,9 @@ class ClickHouseInstance:
["bash", "-c", f"sed -i 's/{replace}/{replacement}/g' {path_to_config}"] ["bash", "-c", f"sed -i 's/{replace}/{replacement}/g' {path_to_config}"]
) )
def create_dir(self, destroy_dir=True): def create_dir(self):
"""Create the instance directory and all the needed files there.""" """Create the instance directory and all the needed files there."""
if destroy_dir:
self.destroy_dir()
elif p.exists(self.path):
return
os.makedirs(self.path) os.makedirs(self.path)
instance_config_dir = p.abspath(p.join(self.path, "configs")) instance_config_dir = p.abspath(p.join(self.path, "configs"))
@ -3953,10 +3974,6 @@ class ClickHouseInstance:
) )
) )
def destroy_dir(self):
if p.exists(self.path):
shutil.rmtree(self.path)
def wait_for_path_exists(self, path, seconds): def wait_for_path_exists(self, path, seconds):
while seconds > 0: while seconds > 0:
seconds -= 1 seconds -= 1

View File

@ -1,5 +1,5 @@
[pytest] [pytest]
python_files = test*.py python_files = test_*/test*.py
norecursedirs = _instances* norecursedirs = _instances*
timeout = 900 timeout = 900
junit_duration_report = call junit_duration_report = call

View File

@ -383,6 +383,7 @@ if __name__ == "__main__":
--volume=/run:/run/host:ro \ --volume=/run:/run/host:ro \
{dockerd_internal_volume} -e DOCKER_CLIENT_TIMEOUT=300 -e COMPOSE_HTTP_TIMEOUT=600 \ {dockerd_internal_volume} -e DOCKER_CLIENT_TIMEOUT=300 -e COMPOSE_HTTP_TIMEOUT=600 \
-e XTABLES_LOCKFILE=/run/host/xtables.lock \ -e XTABLES_LOCKFILE=/run/host/xtables.lock \
-e PYTHONUNBUFFERED=1 \
{env_tags} {env_cleanup} -e PYTEST_OPTS='{parallel} {opts} {tests_list} -vvv' {img} {command}".format( {env_tags} {env_cleanup} -e PYTEST_OPTS='{parallel} {opts} {tests_list} -vvv' {img} {command}".format(
net=net, net=net,
tty=tty, tty=tty,

View File

@ -26,9 +26,6 @@
<ip>127.0.0.1</ip> <ip>127.0.0.1</ip>
<host>clientA1.com</host> <host>clientA1.com</host>
<host>clientA3.com</host> <host>clientA3.com</host>
<host_regexp>clientB\d+\.ru</host_regexp>
<host_regexp>clientC\d+\.ru$</host_regexp>
<host_regexp>^clientD\d+\.ru$</host_regexp>
</networks> </networks>
</default> </default>
</users> </users>

View File

@ -1,4 +1,5 @@
import pytest import pytest
import logging
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
@ -7,16 +8,6 @@ server = cluster.add_instance("server", user_configs=["configs/users.d/network.x
clientA1 = cluster.add_instance("clientA1", hostname="clientA1.com") clientA1 = cluster.add_instance("clientA1", hostname="clientA1.com")
clientA2 = cluster.add_instance("clientA2", hostname="clientA2.com") clientA2 = cluster.add_instance("clientA2", hostname="clientA2.com")
clientA3 = cluster.add_instance("clientA3", hostname="clientA3.com") clientA3 = cluster.add_instance("clientA3", hostname="clientA3.com")
clientB1 = cluster.add_instance("clientB1", hostname="clientB001.ru")
clientB2 = cluster.add_instance("clientB2", hostname="clientB002.ru")
clientB3 = cluster.add_instance("clientB3", hostname="xxx.clientB003.rutracker.com")
clientC1 = cluster.add_instance("clientC1", hostname="clientC01.ru")
clientC2 = cluster.add_instance("clientC2", hostname="xxx.clientC02.ru")
clientC3 = cluster.add_instance("clientC3", hostname="xxx.clientC03.rutracker.com")
clientD1 = cluster.add_instance("clientD1", hostname="clientD0001.ru")
clientD2 = cluster.add_instance("clientD2", hostname="xxx.clientD0002.ru")
clientD3 = cluster.add_instance("clientD3", hostname="clientD0003.ru")
def check_clickhouse_is_ok(client_node, server_node): def check_clickhouse_is_ok(client_node, server_node):
assert ( assert (
@ -29,6 +20,13 @@ def check_clickhouse_is_ok(client_node, server_node):
def query_from_one_node_to_another(client_node, server_node, query): def query_from_one_node_to_another(client_node, server_node, query):
check_clickhouse_is_ok(client_node, server_node) check_clickhouse_is_ok(client_node, server_node)
res1 = client_node.exec_in_container(["ip", "address", "show"])
res2 = client_node.exec_in_container(["host", "clientA1.com"])
res3 = client_node.exec_in_container(["host", "clientA2.com"])
res4 = client_node.exec_in_container(["host", "clientA3.com"])
logging.debug(f"IP: {res1}, A1 {res2}, A2 {res3}, A3 {res4}")
return client_node.exec_in_container( return client_node.exec_in_container(
[ [
"bash", "bash",
@ -55,6 +53,13 @@ def setup_nodes():
) )
query(server, "INSERT INTO test_allowed_client_hosts VALUES (5)") query(server, "INSERT INTO test_allowed_client_hosts VALUES (5)")
s = query(server, "SELECT fqdn(), hostName()")
a1 = query(clientA1, "SELECT fqdn(), hostName()")
a2 = query(clientA2, "SELECT fqdn(), hostName()")
a3 = query(clientA3, "SELECT fqdn(), hostName()")
logging.debug(f"s:{s}, a1:{a1}, a2:{a2}, a3:{a3}")
yield cluster yield cluster
finally: finally:
@ -63,7 +68,6 @@ def setup_nodes():
def test_allowed_host(): def test_allowed_host():
expected_to_pass = [clientA1, clientA3] expected_to_pass = [clientA1, clientA3]
expected_to_fail = [clientA2]
# Reverse DNS lookup currently isn't working as expected in this test. # Reverse DNS lookup currently isn't working as expected in this test.
# For example, it gives something like "vitbartestallowedclienthosts_clientB1_1.vitbartestallowedclienthosts_default" instead of "clientB001.ru". # For example, it gives something like "vitbartestallowedclienthosts_clientB1_1.vitbartestallowedclienthosts_default" instead of "clientB001.ru".
@ -79,6 +83,9 @@ def test_allowed_host():
== "5\n" == "5\n"
) )
def test_denied_host():
expected_to_fail = [clientA2]
for client_node in expected_to_fail: for client_node in expected_to_fail:
with pytest.raises(Exception, match=r"default: Authentication failed"): with pytest.raises(Exception, match=r"default: Authentication failed"):
query_from_one_node_to_another( query_from_one_node_to_another(

View File

@ -49,6 +49,8 @@ def test_config_with_hosts(start_cluster):
assert "not allowed" in node1.query_and_get_error( assert "not allowed" in node1.query_and_get_error(
"CREATE TABLE table_test_1_4 (word String) Engine=URL('https://yandex2.ru', CSV)" "CREATE TABLE table_test_1_4 (word String) Engine=URL('https://yandex2.ru', CSV)"
) )
node1.query("DROP TABLE table_test_1_1")
node1.query("DROP TABLE table_test_1_2")
def test_config_with_only_primary_hosts(start_cluster): def test_config_with_only_primary_hosts(start_cluster):
@ -86,6 +88,11 @@ def test_config_with_only_primary_hosts(start_cluster):
"CREATE TABLE table_test_2_6 (word String) Engine=URL('https://yandex2.ru', CSV)" "CREATE TABLE table_test_2_6 (word String) Engine=URL('https://yandex2.ru', CSV)"
) )
node2.query("DROP TABLE table_test_2_1")
node2.query("DROP TABLE table_test_2_2")
node2.query("DROP TABLE table_test_2_3")
node2.query("DROP TABLE table_test_2_4")
def test_config_with_only_regexp_hosts(start_cluster): def test_config_with_only_regexp_hosts(start_cluster):
assert ( assert (
@ -106,6 +113,8 @@ def test_config_with_only_regexp_hosts(start_cluster):
assert "not allowed" in node3.query_and_get_error( assert "not allowed" in node3.query_and_get_error(
"CREATE TABLE table_test_3_4 (word String) Engine=URL('https://yandex2.ru', CSV)" "CREATE TABLE table_test_3_4 (word String) Engine=URL('https://yandex2.ru', CSV)"
) )
node3.query("DROP TABLE table_test_3_1")
node3.query("DROP TABLE table_test_3_2")
def test_config_without_allowed_hosts_section(start_cluster): def test_config_without_allowed_hosts_section(start_cluster):
@ -139,6 +148,11 @@ def test_config_without_allowed_hosts_section(start_cluster):
) )
== "" == ""
) )
node4.query("DROP TABLE table_test_4_1")
node4.query("DROP TABLE table_test_4_2")
node4.query("DROP TABLE table_test_4_3")
node4.query("DROP TABLE table_test_4_4")
node4.query("DROP TABLE table_test_4_5")
def test_config_without_allowed_hosts(start_cluster): def test_config_without_allowed_hosts(start_cluster):
@ -267,6 +281,7 @@ def test_redirect(start_cluster):
assert "not allowed" in node7.query_and_get_error( assert "not allowed" in node7.query_and_get_error(
"SET max_http_get_redirects=1; SELECT * from table_test_7_1" "SET max_http_get_redirects=1; SELECT * from table_test_7_1"
) )
node7.query("DROP TABLE table_test_7_1")
def test_HDFS(start_cluster): def test_HDFS(start_cluster):

View File

@ -1,14 +1,14 @@
import os import os
import shutil import shutil
import pytest import pytest
import logging
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.dictionary import Field, Row, Dictionary, DictionaryStructure, Layout from helpers.dictionary import Field, Row, Dictionary, DictionaryStructure, Layout
from helpers.external_sources import SourceRedis from helpers.external_sources import SourceRedis
cluster = None cluster = ClickHouseCluster(__file__)
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
dict_configs_path = os.path.join(SCRIPT_DIR, "configs/dictionaries") dict_configs_path = os.path.join(SCRIPT_DIR, "configs/dictionaries")
node = None
KEY_FIELDS = { KEY_FIELDS = {
"simple": [Field("KeyField", "UInt64", is_key=True, default_value_for_get=9999999)], "simple": [Field("KeyField", "UInt64", is_key=True, default_value_for_get=9999999)],
@ -70,8 +70,6 @@ DICTIONARIES = []
def get_dict(source, layout, fields, suffix_name=""): def get_dict(source, layout, fields, suffix_name=""):
global dict_configs_path
structure = DictionaryStructure(layout, fields) structure = DictionaryStructure(layout, fields)
dict_name = source.name + "_" + layout.name + "_" + suffix_name dict_name = source.name + "_" + layout.name + "_" + suffix_name
dict_path = os.path.join(dict_configs_path, dict_name + ".xml") dict_path = os.path.join(dict_configs_path, dict_name + ".xml")
@ -82,13 +80,9 @@ def get_dict(source, layout, fields, suffix_name=""):
return dictionary return dictionary
def setup_module(module): def generate_dict_configs():
global DICTIONARIES global DICTIONARIES
global cluster global cluster
global node
global dict_configs_path
cluster = ClickHouseCluster(__file__)
if os.path.exists(dict_configs_path): if os.path.exists(dict_configs_path):
shutil.rmtree(dict_configs_path) shutil.rmtree(dict_configs_path)
@ -126,9 +120,7 @@ def setup_module(module):
for source in sources: for source in sources:
for layout in LAYOUTS: for layout in LAYOUTS:
if not source.compatible_with_layout(layout): if not source.compatible_with_layout(layout):
print( logging.debug(f"Source {source.name} incompatible with layout {layout.name}")
"Source", source.name, "incompatible with layout", layout.name
)
continue continue
fields = KEY_FIELDS[layout.layout_type] + [field] fields = KEY_FIELDS[layout.layout_type] + [field]
@ -137,7 +129,9 @@ def setup_module(module):
main_configs = [] main_configs = []
dictionaries = [] dictionaries = []
for fname in os.listdir(dict_configs_path): for fname in os.listdir(dict_configs_path):
dictionaries.append(os.path.join(dict_configs_path, fname)) path = os.path.join(dict_configs_path, fname)
logging.debug(f"Found dictionary {path}")
dictionaries.append(path)
node = cluster.add_instance( node = cluster.add_instance(
"node", main_configs=main_configs, dictionaries=dictionaries, with_redis=True "node", main_configs=main_configs, dictionaries=dictionaries, with_redis=True
@ -147,13 +141,15 @@ def setup_module(module):
@pytest.fixture(scope="module", autouse=True) @pytest.fixture(scope="module", autouse=True)
def started_cluster(): def started_cluster():
try: try:
generate_dict_configs()
cluster.start() cluster.start()
assert len(FIELDS) == len(VALUES) assert len(FIELDS) == len(VALUES)
for dicts in DICTIONARIES: for dicts in DICTIONARIES:
for dictionary in dicts: for dictionary in dicts:
print("Preparing", dictionary.name) logging.debug(f"Preparing {dictionary.name}")
dictionary.prepare_source(cluster) dictionary.prepare_source(cluster)
print("Prepared") logging.debug(f"Prepared {dictionary.name}")
yield cluster yield cluster
@ -161,14 +157,19 @@ def started_cluster():
cluster.shutdown() cluster.shutdown()
@pytest.mark.parametrize("id", list(range(len(FIELDS)))) def get_entity_id(entity):
return FIELDS[entity].name
@pytest.mark.parametrize("id", list(range(len(FIELDS))), ids=get_entity_id)
def test_redis_dictionaries(started_cluster, id): def test_redis_dictionaries(started_cluster, id):
print("id:", id) logging.debug(f"Run test with id: {id}")
dicts = DICTIONARIES[id] dicts = DICTIONARIES[id]
values = VALUES[id] values = VALUES[id]
field = FIELDS[id] field = FIELDS[id]
node = started_cluster.instances["node"]
node.query("system reload dictionaries") node.query("system reload dictionaries")
for dct in dicts: for dct in dicts:
@ -193,10 +194,9 @@ def test_redis_dictionaries(started_cluster, id):
for query in dct.get_select_get_or_default_queries(field, row): for query in dct.get_select_get_or_default_queries(field, row):
queries_with_answers.append((query, field.default_value_for_get)) queries_with_answers.append((query, field.default_value_for_get))
node.query("system reload dictionary {}".format(dct.name)) node.query(f"system reload dictionary {dct.name}")
for query, answer in queries_with_answers: for query, answer in queries_with_answers:
print(query)
assert node.query(query) == str(answer) + "\n" assert node.query(query) == str(answer) + "\n"
# Checks, that dictionaries can be reloaded. # Checks, that dictionaries can be reloaded.

View File

@ -3,16 +3,11 @@ import time
import os import os
import pytest import pytest
from helpers.cluster import ClickHouseCluster, get_instances_dir from helpers.cluster import ClickHouseCluster
from helpers.utility import generate_values, replace_config, SafeThread from helpers.utility import generate_values, replace_config, SafeThread
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/storage_conf.xml".format(get_instances_dir()),
)
NODE_NAME = "node" NODE_NAME = "node"
TABLE_NAME = "blob_storage_table" TABLE_NAME = "blob_storage_table"
AZURE_BLOB_STORAGE_DISK = "blob_storage_disk" AZURE_BLOB_STORAGE_DISK = "blob_storage_disk"
@ -51,7 +46,7 @@ def azure_query(node, query, try_num=3):
return node.query(query) return node.query(query)
except Exception as ex: except Exception as ex:
retriable_errors = [ retriable_errors = [
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response", "DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response"
] ]
retry = False retry = False
for error in retriable_errors: for error in retriable_errors:
@ -160,13 +155,7 @@ def test_inserts_selects(cluster):
) )
@pytest.mark.parametrize( @pytest.mark.parametrize("merge_vertical", [(True), (False)])
"merge_vertical",
[
(True),
(False),
],
)
def test_insert_same_partition_and_merge(cluster, merge_vertical): def test_insert_same_partition_and_merge(cluster, merge_vertical):
settings = {} settings = {}
if merge_vertical: if merge_vertical:
@ -498,6 +487,12 @@ def test_freeze_unfreeze(cluster):
def test_apply_new_settings(cluster): def test_apply_new_settings(cluster):
node = cluster.instances[NODE_NAME] node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME) create_table(node, TABLE_NAME)
config_path = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/storage_conf.xml".format(
cluster.instances_dir_name
),
)
azure_query( azure_query(
node, f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 4096)}" node, f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 4096)}"
@ -505,7 +500,7 @@ def test_apply_new_settings(cluster):
# Force multi-part upload mode. # Force multi-part upload mode.
replace_config( replace_config(
CONFIG_PATH, config_path,
"<max_single_part_upload_size>33554432</max_single_part_upload_size>", "<max_single_part_upload_size>33554432</max_single_part_upload_size>",
"<max_single_part_upload_size>4096</max_single_part_upload_size>", "<max_single_part_upload_size>4096</max_single_part_upload_size>",
) )
@ -522,10 +517,16 @@ def test_apply_new_settings(cluster):
def test_restart_during_load(cluster): def test_restart_during_load(cluster):
node = cluster.instances[NODE_NAME] node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME) create_table(node, TABLE_NAME)
config_path = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/storage_conf.xml".format(
cluster.instances_dir_name
),
)
# Force multi-part upload mode. # Force multi-part upload mode.
replace_config( replace_config(
CONFIG_PATH, "<container_already_exists>false</container_already_exists>", "" config_path, "<container_already_exists>false</container_already_exists>", ""
) )
azure_query( azure_query(

View File

@ -3,15 +3,11 @@ import time
import os import os
import pytest import pytest
from helpers.cluster import ClickHouseCluster, get_instances_dir from helpers.cluster import ClickHouseCluster
from helpers.utility import generate_values, replace_config, SafeThread from helpers.utility import generate_values, replace_config, SafeThread
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/storage_conf.xml".format(get_instances_dir()),
)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
@ -562,6 +558,13 @@ def test_s3_disk_apply_new_settings(cluster, node_name):
node = cluster.instances[node_name] node = cluster.instances[node_name]
create_table(node, "s3_test") create_table(node, "s3_test")
config_path = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/storage_conf.xml".format(
cluster.instances_dir_name
),
)
def get_s3_requests(): def get_s3_requests():
node.query("SYSTEM FLUSH LOGS") node.query("SYSTEM FLUSH LOGS")
return int( return int(
@ -578,7 +581,7 @@ def test_s3_disk_apply_new_settings(cluster, node_name):
# Force multi-part upload mode. # Force multi-part upload mode.
replace_config( replace_config(
CONFIG_PATH, config_path,
"<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>", "<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>",
"<s3_max_single_part_upload_size>0</s3_max_single_part_upload_size>", "<s3_max_single_part_upload_size>0</s3_max_single_part_upload_size>",
) )

View File

@ -5,28 +5,22 @@ import string
import time import time
import pytest import pytest
from helpers.cluster import ClickHouseCluster, get_instances_dir from helpers.cluster import ClickHouseCluster
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
NOT_RESTORABLE_CONFIG_PATH = os.path.join(
SCRIPT_DIR,
"./{}/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml".format(
get_instances_dir()
),
)
COMMON_CONFIGS = [ COMMON_CONFIGS = [
"configs/config.d/bg_processing_pool_conf.xml", "configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/clusters.xml", "configs/config.d/clusters.xml",
] ]
def replace_config(old, new): def replace_config(path, old, new):
config = open(NOT_RESTORABLE_CONFIG_PATH, "r") config = open(path, "r")
config_lines = config.readlines() config_lines = config.readlines()
config.close() config.close()
config_lines = [line.replace(old, new) for line in config_lines] config_lines = [line.replace(old, new) for line in config_lines]
config = open(NOT_RESTORABLE_CONFIG_PATH, "w") config = open(path, "w")
config.writelines(config_lines) config.writelines(config_lines)
config.close() config.close()
@ -507,6 +501,12 @@ def test_restore_mutations(cluster, db_atomic):
def test_migrate_to_restorable_schema(cluster): def test_migrate_to_restorable_schema(cluster):
db_atomic = True db_atomic = True
node = cluster.instances["node_not_restorable"] node = cluster.instances["node_not_restorable"]
config_path = os.path.join(
SCRIPT_DIR,
"./{}/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml".format(
cluster.instances_dir_name
),
)
create_table(node, "test", db_atomic=db_atomic) create_table(node, "test", db_atomic=db_atomic)
uuid = get_table_uuid(node, db_atomic, "test") uuid = get_table_uuid(node, db_atomic, "test")
@ -525,7 +525,9 @@ def test_migrate_to_restorable_schema(cluster):
) )
replace_config( replace_config(
"<send_metadata>false</send_metadata>", "<send_metadata>true</send_metadata>" config_path,
"<send_metadata>false</send_metadata>",
"<send_metadata>true</send_metadata>",
) )
node.restart_clickhouse() node.restart_clickhouse()

View File

@ -2,18 +2,13 @@ import os
import time import time
import pytest import pytest
from helpers.cluster import ClickHouseCluster, get_instances_dir from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", main_configs=["configs/max_table_size_to_drop.xml"]) node = cluster.add_instance("node", main_configs=["configs/max_table_size_to_drop.xml"])
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/max_table_size_to_drop.xml".format(get_instances_dir()),
)
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def start_cluster(): def start_cluster():
@ -29,6 +24,12 @@ def start_cluster():
def test_reload_max_table_size_to_drop(start_cluster): def test_reload_max_table_size_to_drop(start_cluster):
node.query("INSERT INTO test VALUES (now(), 0)") node.query("INSERT INTO test VALUES (now(), 0)")
config_path = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/max_table_size_to_drop.xml".format(
start_cluster.instances_dir_name
),
)
time.sleep(5) # wait for data part commit time.sleep(5) # wait for data part commit
@ -37,14 +38,14 @@ def test_reload_max_table_size_to_drop(start_cluster):
assert out == "" assert out == ""
assert err != "" assert err != ""
config = open(CONFIG_PATH, "r") config = open(config_path, "r")
config_lines = config.readlines() config_lines = config.readlines()
config.close() config.close()
config_lines = [ config_lines = [
line.replace("<max_table_size_to_drop>1", "<max_table_size_to_drop>1000000") line.replace("<max_table_size_to_drop>1", "<max_table_size_to_drop>1000000")
for line in config_lines for line in config_lines
] ]
config = open(CONFIG_PATH, "w") config = open(config_path, "w")
config.writelines(config_lines) config.writelines(config_lines)
config.close() config.close()

View File

@ -5,7 +5,7 @@ import string
import time import time
import pytest import pytest
from helpers.cluster import ClickHouseCluster, get_instances_dir from helpers.cluster import ClickHouseCluster
COMMON_CONFIGS = ["configs/config.d/clusters.xml"] COMMON_CONFIGS = ["configs/config.d/clusters.xml"]

View File

@ -9,7 +9,7 @@ import time
import helpers.client import helpers.client
import pytest import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, get_instances_dir from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.network import PartitionManager from helpers.network import PartitionManager
from helpers.test_tools import exec_query_with_retry from helpers.test_tools import exec_query_with_retry
@ -17,11 +17,6 @@ MINIO_INTERNAL_PORT = 9001
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(
SCRIPT_DIR, "./{}/dummy/configs/config.d/defaultS3.xml".format(get_instances_dir())
)
# Creates S3 bucket for tests and allows anonymous read-write access to it. # Creates S3 bucket for tests and allows anonymous read-write access to it.
def prepare_s3_bucket(started_cluster): def prepare_s3_bucket(started_cluster):
# Allows read-write access for bucket without authorization. # Allows read-write access for bucket without authorization.
@ -724,17 +719,24 @@ def run_s3_mocks(started_cluster):
logging.info("S3 mocks started") logging.info("S3 mocks started")
def replace_config(old, new): def replace_config(path, old, new):
config = open(CONFIG_PATH, "r") config = open(path, "r")
config_lines = config.readlines() config_lines = config.readlines()
config.close() config.close()
config_lines = [line.replace(old, new) for line in config_lines] config_lines = [line.replace(old, new) for line in config_lines]
config = open(CONFIG_PATH, "w") config = open(path, "w")
config.writelines(config_lines) config.writelines(config_lines)
config.close() config.close()
def test_custom_auth_headers(started_cluster): def test_custom_auth_headers(started_cluster):
config_path = os.path.join(
SCRIPT_DIR,
"./{}/dummy/configs/config.d/defaultS3.xml".format(
started_cluster.instances_dir_name
),
)
table_format = "column1 UInt32, column2 UInt32, column3 UInt32" table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv" filename = "test.csv"
get_query = "select * from s3('http://resolver:8080/{bucket}/{file}', 'CSV', '{table_format}')".format( get_query = "select * from s3('http://resolver:8080/{bucket}/{file}', 'CSV', '{table_format}')".format(
@ -758,6 +760,7 @@ def test_custom_auth_headers(started_cluster):
assert run_query(instance, "SELECT * FROM test") == "1\t2\t3\n" assert run_query(instance, "SELECT * FROM test") == "1\t2\t3\n"
replace_config( replace_config(
config_path,
"<header>Authorization: Bearer TOKEN", "<header>Authorization: Bearer TOKEN",
"<header>Authorization: Bearer INVALID_TOKEN", "<header>Authorization: Bearer INVALID_TOKEN",
) )
@ -765,6 +768,7 @@ def test_custom_auth_headers(started_cluster):
ret, err = instance.query_and_get_answer_with_error("SELECT * FROM test") ret, err = instance.query_and_get_answer_with_error("SELECT * FROM test")
assert ret == "" and err != "" assert ret == "" and err != ""
replace_config( replace_config(
config_path,
"<header>Authorization: Bearer INVALID_TOKEN", "<header>Authorization: Bearer INVALID_TOKEN",
"<header>Authorization: Bearer TOKEN", "<header>Authorization: Bearer TOKEN",
) )
@ -805,10 +809,7 @@ def test_infinite_redirect(started_cluster):
@pytest.mark.parametrize( @pytest.mark.parametrize(
"extension,method", "extension,method",
[ [pytest.param("bin", "gzip", id="bin"), pytest.param("gz", "auto", id="gz")],
pytest.param("bin", "gzip", id="bin"),
pytest.param("gz", "auto", id="gz"),
],
) )
def test_storage_s3_get_gzip(started_cluster, extension, method): def test_storage_s3_get_gzip(started_cluster, extension, method):
bucket = started_cluster.minio_bucket bucket = started_cluster.minio_bucket