Merge pull request #38978 from qoega/integration-tests-7

This commit is contained in:
Ilya Yatsishin 2022-08-05 07:21:18 +02:00 committed by GitHub
commit e583345987
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 237 additions and 173 deletions

View File

@ -62,7 +62,7 @@ def pre_build(repo_path: str, env_variables: List[str]):
f"git -C {repo_path} fetch --no-recurse-submodules "
"--no-tags origin master:master"
)
logging.info("Getting master branch for performance artifact: ''%s'", cmd)
logging.info("Getting master branch for performance artifact: '%s'", cmd)
subprocess.check_call(cmd, shell=True)

View File

@ -237,6 +237,24 @@ def enable_consistent_hash_plugin(rabbitmq_id):
return p.returncode == 0
def get_instances_dir(name):
instances_dir_name = "_instances"
worker_name = os.environ.get("PYTEST_XDIST_WORKER", "")
run_id = os.environ.get("INTEGRATION_TESTS_RUN_ID", "")
if worker_name:
instances_dir_name += "_" + worker_name
if name:
instances_dir_name += "_" + name
if run_id:
instances_dir_name += "_" + shlex.quote(run_id)
return instances_dir_name
def extract_test_name(base_path):
"""Extracts the name of the test based to a path to its test*.py file
Must be unique in each test directory (because it's used to make instances dir and to stop docker containers from previous run)
@ -249,16 +267,6 @@ def extract_test_name(base_path):
return name
def get_instances_dir():
if (
"INTEGRATION_TESTS_RUN_ID" in os.environ
and os.environ["INTEGRATION_TESTS_RUN_ID"]
):
return "_instances_" + shlex.quote(os.environ["INTEGRATION_TESTS_RUN_ID"])
else:
return "_instances"
class ClickHouseCluster:
"""ClickHouse cluster with several instances and (possibly) ZooKeeper.
@ -318,19 +326,8 @@ class ClickHouseCluster:
)
# docker-compose removes everything non-alphanumeric from project names so we do it too.
self.project_name = re.sub(r"[^a-z0-9]", "", project_name.lower())
instances_dir_name = "_instances"
if self.name:
instances_dir_name += "_" + self.name
if (
"INTEGRATION_TESTS_RUN_ID" in os.environ
and os.environ["INTEGRATION_TESTS_RUN_ID"]
):
instances_dir_name += "_" + shlex.quote(
os.environ["INTEGRATION_TESTS_RUN_ID"]
)
self.instances_dir = p.join(self.base_dir, instances_dir_name)
self.instances_dir_name = get_instances_dir(self.name)
self.instances_dir = p.join(self.base_dir, self.instances_dir_name)
self.docker_logs_path = p.join(self.instances_dir, "docker.log")
self.env_file = p.join(self.instances_dir, DEFAULT_ENV_NAME)
self.env_variables = {}
@ -551,8 +548,41 @@ class ClickHouseCluster:
self.is_up = False
self.env = os.environ.copy()
logging.debug(f"CLUSTER INIT base_config_dir:{self.base_config_dir}")
if p.exists(self.instances_dir):
shutil.rmtree(self.instances_dir, ignore_errors=True)
logging.debug(f"Removed :{self.instances_dir}")
os.mkdir(self.instances_dir)
def print_all_docker_pieces(self):
res_networks = subprocess.check_output(
f"docker network ls --filter name='{self.project_name}*'",
shell=True,
universal_newlines=True,
)
logging.debug(
f"Docker networks for project {self.project_name} are {res_networks}"
)
res_containers = subprocess.check_output(
f"docker container ls -a --filter name='{self.project_name}*'",
shell=True,
universal_newlines=True,
)
logging.debug(
f"Docker containers for project {self.project_name} are {res_containers}"
)
res_volumes = subprocess.check_output(
f"docker volume ls --filter name='{self.project_name}*'",
shell=True,
universal_newlines=True,
)
logging.debug(
f"Docker volumes for project {self.project_name} are {res_volumes}"
)
def cleanup(self):
logging.debug("Cleanup called")
self.print_all_docker_pieces()
if (
os.environ
and "DISABLE_CLEANUP" in os.environ
@ -564,7 +594,8 @@ class ClickHouseCluster:
# Just in case kill unstopped containers from previous launch
try:
unstopped_containers = self.get_running_containers()
if unstopped_containers:
logging.debug(f"Unstopped containers: {unstopped_containers}")
if len(unstopped_containers):
logging.debug(
f"Trying to kill unstopped containers: {unstopped_containers}"
)
@ -578,26 +609,33 @@ class ClickHouseCluster:
logging.debug(f"Unstopped containers killed.")
else:
logging.debug(f"No running containers for project: {self.project_name}")
except Exception as ex:
logging.debug(f"Got exception removing containers {str(ex)}")
# # Just in case remove unused networks
try:
logging.debug("Trying to prune unused networks...")
list_networks = subprocess.check_output(
f"docker network ls -q --filter name='{self.project_name}'",
shell=True,
universal_newlines=True,
).splitlines()
if list_networks:
logging.debug(f"Trying to remove networks: {list_networks}")
run_and_check(f"docker network rm {' '.join(list_networks)}")
logging.debug(f"Networks removed: {list_networks}")
except:
pass
# # Just in case remove unused networks
# try:
# logging.debug("Trying to prune unused networks...")
# run_and_check(['docker', 'network', 'prune', '-f'])
# logging.debug("Networks pruned")
# except:
# pass
# Remove unused images
# try:
# logging.debug("Trying to prune unused images...")
try:
logging.debug("Trying to prune unused images...")
# run_and_check(['docker', 'image', 'prune', '-f'])
# logging.debug("Images pruned")
# except:
# pass
run_and_check(["docker", "image", "prune", "-f"])
logging.debug("Images pruned")
except:
pass
# Remove unused volumes
try:
@ -640,9 +678,7 @@ class ClickHouseCluster:
f"docker container list --all --filter name='{filter_name}' --format '{format}'",
shell=True,
)
containers = dict(
line.split(":", 1) for line in containers.decode("utf8").splitlines()
)
containers = dict(line.split(":", 1) for line in containers.splitlines())
return containers
def copy_file_from_container_to_container(
@ -1818,7 +1854,7 @@ class ClickHouseCluster:
errors += [str(ex)]
time.sleep(0.5)
run_and_check(["docker-compose", "ps", "--services", "--all"])
run_and_check(["docker", "ps", "--all"])
logging.error("Can't connect to MySQL:{}".format(errors))
raise Exception("Cannot wait MySQL container")
@ -1840,7 +1876,7 @@ class ClickHouseCluster:
logging.debug("Can't connect to MySQL 8 " + str(ex))
time.sleep(0.5)
run_and_check(["docker-compose", "ps", "--services", "--all"])
run_and_check(["docker", "ps", "--all"])
raise Exception("Cannot wait MySQL 8 container")
def wait_mysql_cluster_to_start(self, timeout=180):
@ -1865,7 +1901,7 @@ class ClickHouseCluster:
errors += [str(ex)]
time.sleep(0.5)
run_and_check(["docker-compose", "ps", "--services", "--all"])
run_and_check(["docker", "ps", "--all"])
logging.error("Can't connect to MySQL:{}".format(errors))
raise Exception("Cannot wait MySQL container")
@ -2180,6 +2216,14 @@ class ClickHouseCluster:
logging.debug("Can't connect to Minio: %s", str(ex))
time.sleep(1)
try:
with open(os.path.join(self.minio_dir, "docker.log"), "w+") as f:
subprocess.check_call( # STYLE_CHECK_ALLOW_SUBPROCESS_CHECK_CALL
self.base_minio_cmd + ["logs"], stdout=f
)
except Exception as e:
logging.debug("Unable to get logs from docker.")
raise Exception("Can't wait Minio to start")
def wait_azurite_to_start(self, timeout=180):
@ -2250,15 +2294,13 @@ class ClickHouseCluster:
raise Exception("Can't wait Cassandra to start")
def start(self, destroy_dirs=True):
def start(self):
pytest_xdist_logging_to_separate_files.setup()
logging.info("Running tests in {}".format(self.base_path))
logging.debug(
"Cluster start called. is_up={}, destroy_dirs={}".format(
self.is_up, destroy_dirs
)
)
logging.debug(f"Cluster start called. is_up={self.is_up}")
self.print_all_docker_pieces()
if self.is_up:
return
@ -2268,23 +2310,9 @@ class ClickHouseCluster:
logging.warning("Cleanup failed:{e}")
try:
# clickhouse_pull_cmd = self.base_cmd + ['pull']
# print(f"Pulling images for {self.base_cmd}")
# retry_exception(10, 5, subprocess_check_call, Exception, clickhouse_pull_cmd)
if destroy_dirs and p.exists(self.instances_dir):
logging.debug(f"Removing instances dir {self.instances_dir}")
shutil.rmtree(self.instances_dir)
for instance in list(self.instances.values()):
logging.debug(
(
"Setup directory for instance: {} destroy_dirs: {}".format(
instance.name, destroy_dirs
)
)
)
instance.create_dir(destroy_dir=destroy_dirs)
logging.debug(f"Setup directory for instance: {instance.name}")
instance.create_dir()
_create_env_file(os.path.join(self.env_file), self.env_variables)
self.docker_client = docker.DockerClient(
@ -2684,13 +2712,9 @@ class ClickHouseCluster:
def pause_container(self, instance_name):
subprocess_check_call(self.base_cmd + ["pause", instance_name])
# subprocess_check_call(self.base_cmd + ['kill', '-s SIGSTOP', instance_name])
def unpause_container(self, instance_name):
subprocess_check_call(self.base_cmd + ["unpause", instance_name])
# subprocess_check_call(self.base_cmd + ['kill', '-s SIGCONT', instance_name])
def open_bash_shell(self, instance_name):
os.system(" ".join(self.base_cmd + ["exec", instance_name, "/bin/bash"]))
@ -3626,14 +3650,14 @@ class ClickHouseInstance:
"bash",
"-c",
"echo 'ATTACH DATABASE system ENGINE=Ordinary' > /var/lib/clickhouse/metadata/system.sql",
],
]
)
self.exec_in_container(
[
"bash",
"-c",
"echo 'ATTACH DATABASE system ENGINE=Ordinary' > /var/lib/clickhouse/metadata/default.sql",
],
]
)
self.exec_in_container(
["bash", "-c", "{} --daemon".format(self.clickhouse_start_command)],
@ -3788,14 +3812,9 @@ class ClickHouseInstance:
["bash", "-c", f"sed -i 's/{replace}/{replacement}/g' {path_to_config}"]
)
def create_dir(self, destroy_dir=True):
def create_dir(self):
"""Create the instance directory and all the needed files there."""
if destroy_dir:
self.destroy_dir()
elif p.exists(self.path):
return
os.makedirs(self.path)
instance_config_dir = p.abspath(p.join(self.path, "configs"))
@ -4059,10 +4078,6 @@ class ClickHouseInstance:
)
)
def destroy_dir(self):
if p.exists(self.path):
shutil.rmtree(self.path)
def wait_for_path_exists(self, path, seconds):
while seconds > 0:
seconds -= 1

View File

@ -1,5 +1,5 @@
[pytest]
python_files = test*.py
python_files = test_*/test*.py
norecursedirs = _instances*
timeout = 900
junit_duration_report = call

View File

@ -383,6 +383,7 @@ if __name__ == "__main__":
--volume=/run:/run/host:ro \
{dockerd_internal_volume} -e DOCKER_CLIENT_TIMEOUT=300 -e COMPOSE_HTTP_TIMEOUT=600 \
-e XTABLES_LOCKFILE=/run/host/xtables.lock \
-e PYTHONUNBUFFERED=1 \
{env_tags} {env_cleanup} -e PYTEST_OPTS='{parallel} {opts} {tests_list} -vvv' {img} {command}".format(
net=net,
tty=tty,

View File

@ -26,9 +26,6 @@
<ip>127.0.0.1</ip>
<host>clientA1.com</host>
<host>clientA3.com</host>
<host_regexp>clientB\d+\.ru</host_regexp>
<host_regexp>clientC\d+\.ru$</host_regexp>
<host_regexp>^clientD\d+\.ru$</host_regexp>
</networks>
</default>
</users>

View File

@ -1,4 +1,5 @@
import pytest
import logging
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
@ -7,15 +8,6 @@ server = cluster.add_instance("server", user_configs=["configs/users.d/network.x
clientA1 = cluster.add_instance("clientA1", hostname="clientA1.com")
clientA2 = cluster.add_instance("clientA2", hostname="clientA2.com")
clientA3 = cluster.add_instance("clientA3", hostname="clientA3.com")
clientB1 = cluster.add_instance("clientB1", hostname="clientB001.ru")
clientB2 = cluster.add_instance("clientB2", hostname="clientB002.ru")
clientB3 = cluster.add_instance("clientB3", hostname="xxx.clientB003.rutracker.com")
clientC1 = cluster.add_instance("clientC1", hostname="clientC01.ru")
clientC2 = cluster.add_instance("clientC2", hostname="xxx.clientC02.ru")
clientC3 = cluster.add_instance("clientC3", hostname="xxx.clientC03.rutracker.com")
clientD1 = cluster.add_instance("clientD1", hostname="clientD0001.ru")
clientD2 = cluster.add_instance("clientD2", hostname="xxx.clientD0002.ru")
clientD3 = cluster.add_instance("clientD3", hostname="clientD0003.ru")
def check_clickhouse_is_ok(client_node, server_node):
@ -29,6 +21,13 @@ def check_clickhouse_is_ok(client_node, server_node):
def query_from_one_node_to_another(client_node, server_node, query):
check_clickhouse_is_ok(client_node, server_node)
res1 = client_node.exec_in_container(["ip", "address", "show"])
res2 = client_node.exec_in_container(["host", "clientA1.com"])
res3 = client_node.exec_in_container(["host", "clientA2.com"])
res4 = client_node.exec_in_container(["host", "clientA3.com"])
logging.debug(f"IP: {res1}, A1 {res2}, A2 {res3}, A3 {res4}")
return client_node.exec_in_container(
[
"bash",
@ -55,6 +54,13 @@ def setup_nodes():
)
query(server, "INSERT INTO test_allowed_client_hosts VALUES (5)")
s = query(server, "SELECT fqdn(), hostName()")
a1 = query(clientA1, "SELECT fqdn(), hostName()")
a2 = query(clientA2, "SELECT fqdn(), hostName()")
a3 = query(clientA3, "SELECT fqdn(), hostName()")
logging.debug(f"s:{s}, a1:{a1}, a2:{a2}, a3:{a3}")
yield cluster
finally:
@ -63,7 +69,6 @@ def setup_nodes():
def test_allowed_host():
expected_to_pass = [clientA1, clientA3]
expected_to_fail = [clientA2]
# Reverse DNS lookup currently isn't working as expected in this test.
# For example, it gives something like "vitbartestallowedclienthosts_clientB1_1.vitbartestallowedclienthosts_default" instead of "clientB001.ru".
@ -79,6 +84,10 @@ def test_allowed_host():
== "5\n"
)
def test_denied_host():
expected_to_fail = [clientA2]
for client_node in expected_to_fail:
with pytest.raises(Exception, match=r"default: Authentication failed"):
query_from_one_node_to_another(

View File

@ -49,6 +49,8 @@ def test_config_with_hosts(start_cluster):
assert "not allowed" in node1.query_and_get_error(
"CREATE TABLE table_test_1_4 (word String) Engine=URL('https://yandex2.ru', CSV)"
)
node1.query("DROP TABLE table_test_1_1")
node1.query("DROP TABLE table_test_1_2")
def test_config_with_only_primary_hosts(start_cluster):
@ -86,6 +88,11 @@ def test_config_with_only_primary_hosts(start_cluster):
"CREATE TABLE table_test_2_6 (word String) Engine=URL('https://yandex2.ru', CSV)"
)
node2.query("DROP TABLE table_test_2_1")
node2.query("DROP TABLE table_test_2_2")
node2.query("DROP TABLE table_test_2_3")
node2.query("DROP TABLE table_test_2_4")
def test_config_with_only_regexp_hosts(start_cluster):
assert (
@ -106,6 +113,8 @@ def test_config_with_only_regexp_hosts(start_cluster):
assert "not allowed" in node3.query_and_get_error(
"CREATE TABLE table_test_3_4 (word String) Engine=URL('https://yandex2.ru', CSV)"
)
node3.query("DROP TABLE table_test_3_1")
node3.query("DROP TABLE table_test_3_2")
def test_config_without_allowed_hosts_section(start_cluster):
@ -139,6 +148,11 @@ def test_config_without_allowed_hosts_section(start_cluster):
)
== ""
)
node4.query("DROP TABLE table_test_4_1")
node4.query("DROP TABLE table_test_4_2")
node4.query("DROP TABLE table_test_4_3")
node4.query("DROP TABLE table_test_4_4")
node4.query("DROP TABLE table_test_4_5")
def test_config_without_allowed_hosts(start_cluster):
@ -267,6 +281,7 @@ def test_redirect(start_cluster):
assert "not allowed" in node7.query_and_get_error(
"SET max_http_get_redirects=1; SELECT * from table_test_7_1"
)
node7.query("DROP TABLE table_test_7_1")
def test_HDFS(start_cluster):

View File

@ -1,14 +1,14 @@
import os
import shutil
import pytest
import logging
from helpers.cluster import ClickHouseCluster
from helpers.dictionary import Field, Row, Dictionary, DictionaryStructure, Layout
from helpers.external_sources import SourceRedis
cluster = None
cluster = ClickHouseCluster(__file__)
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
dict_configs_path = os.path.join(SCRIPT_DIR, "configs/dictionaries")
node = None
KEY_FIELDS = {
"simple": [Field("KeyField", "UInt64", is_key=True, default_value_for_get=9999999)],
@ -70,8 +70,6 @@ DICTIONARIES = []
def get_dict(source, layout, fields, suffix_name=""):
global dict_configs_path
structure = DictionaryStructure(layout, fields)
dict_name = source.name + "_" + layout.name + "_" + suffix_name
dict_path = os.path.join(dict_configs_path, dict_name + ".xml")
@ -82,13 +80,9 @@ def get_dict(source, layout, fields, suffix_name=""):
return dictionary
def setup_module(module):
def generate_dict_configs():
global DICTIONARIES
global cluster
global node
global dict_configs_path
cluster = ClickHouseCluster(__file__)
if os.path.exists(dict_configs_path):
shutil.rmtree(dict_configs_path)
@ -126,8 +120,8 @@ def setup_module(module):
for source in sources:
for layout in LAYOUTS:
if not source.compatible_with_layout(layout):
print(
"Source", source.name, "incompatible with layout", layout.name
logging.debug(
f"Source {source.name} incompatible with layout {layout.name}"
)
continue
@ -137,7 +131,9 @@ def setup_module(module):
main_configs = []
dictionaries = []
for fname in os.listdir(dict_configs_path):
dictionaries.append(os.path.join(dict_configs_path, fname))
path = os.path.join(dict_configs_path, fname)
logging.debug(f"Found dictionary {path}")
dictionaries.append(path)
node = cluster.add_instance(
"node", main_configs=main_configs, dictionaries=dictionaries, with_redis=True
@ -147,13 +143,15 @@ def setup_module(module):
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
generate_dict_configs()
cluster.start()
assert len(FIELDS) == len(VALUES)
for dicts in DICTIONARIES:
for dictionary in dicts:
print("Preparing", dictionary.name)
logging.debug(f"Preparing {dictionary.name}")
dictionary.prepare_source(cluster)
print("Prepared")
logging.debug(f"Prepared {dictionary.name}")
yield cluster
@ -161,14 +159,19 @@ def started_cluster():
cluster.shutdown()
@pytest.mark.parametrize("id", list(range(len(FIELDS))))
def get_entity_id(entity):
return FIELDS[entity].name
@pytest.mark.parametrize("id", list(range(len(FIELDS))), ids=get_entity_id)
def test_redis_dictionaries(started_cluster, id):
print("id:", id)
logging.debug(f"Run test with id: {id}")
dicts = DICTIONARIES[id]
values = VALUES[id]
field = FIELDS[id]
node = started_cluster.instances["node"]
node.query("system reload dictionaries")
for dct in dicts:
@ -193,10 +196,9 @@ def test_redis_dictionaries(started_cluster, id):
for query in dct.get_select_get_or_default_queries(field, row):
queries_with_answers.append((query, field.default_value_for_get))
node.query("system reload dictionary {}".format(dct.name))
node.query(f"system reload dictionary {dct.name}")
for query, answer in queries_with_answers:
print(query)
assert node.query(query) == str(answer) + "\n"
# Checks, that dictionaries can be reloaded.

View File

@ -3,16 +3,11 @@ import time
import os
import pytest
from helpers.cluster import ClickHouseCluster, get_instances_dir
from helpers.cluster import ClickHouseCluster
from helpers.utility import generate_values, replace_config, SafeThread
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/storage_conf.xml".format(get_instances_dir()),
)
NODE_NAME = "node"
TABLE_NAME = "blob_storage_table"
AZURE_BLOB_STORAGE_DISK = "blob_storage_disk"
@ -51,7 +46,7 @@ def azure_query(node, query, try_num=3):
return node.query(query)
except Exception as ex:
retriable_errors = [
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response",
"DB::Exception: Azure::Core::Http::TransportException: Connection was closed by the server while trying to read a response"
]
retry = False
for error in retriable_errors:
@ -160,13 +155,7 @@ def test_inserts_selects(cluster):
)
@pytest.mark.parametrize(
"merge_vertical",
[
(True),
(False),
],
)
@pytest.mark.parametrize("merge_vertical", [(True), (False)])
def test_insert_same_partition_and_merge(cluster, merge_vertical):
settings = {}
if merge_vertical:
@ -498,6 +487,12 @@ def test_freeze_unfreeze(cluster):
def test_apply_new_settings(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
config_path = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/storage_conf.xml".format(
cluster.instances_dir_name
),
)
azure_query(
node, f"INSERT INTO {TABLE_NAME} VALUES {generate_values('2020-01-03', 4096)}"
@ -505,7 +500,7 @@ def test_apply_new_settings(cluster):
# Force multi-part upload mode.
replace_config(
CONFIG_PATH,
config_path,
"<max_single_part_upload_size>33554432</max_single_part_upload_size>",
"<max_single_part_upload_size>4096</max_single_part_upload_size>",
)
@ -522,10 +517,16 @@ def test_apply_new_settings(cluster):
def test_restart_during_load(cluster):
node = cluster.instances[NODE_NAME]
create_table(node, TABLE_NAME)
config_path = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/storage_conf.xml".format(
cluster.instances_dir_name
),
)
# Force multi-part upload mode.
replace_config(
CONFIG_PATH, "<container_already_exists>false</container_already_exists>", ""
config_path, "<container_already_exists>false</container_already_exists>", ""
)
azure_query(

View File

@ -3,15 +3,11 @@ import time
import os
import pytest
from helpers.cluster import ClickHouseCluster, get_instances_dir
from helpers.cluster import ClickHouseCluster
from helpers.utility import generate_values, replace_config, SafeThread
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/storage_conf.xml".format(get_instances_dir()),
)
@pytest.fixture(scope="module")
@ -577,6 +573,13 @@ def test_s3_disk_apply_new_settings(cluster, node_name):
node = cluster.instances[node_name]
create_table(node, "s3_test")
config_path = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/storage_conf.xml".format(
cluster.instances_dir_name
),
)
def get_s3_requests():
node.query("SYSTEM FLUSH LOGS")
return int(
@ -593,7 +596,7 @@ def test_s3_disk_apply_new_settings(cluster, node_name):
# Force multi-part upload mode.
replace_config(
CONFIG_PATH,
config_path,
"<s3_max_single_part_upload_size>33554432</s3_max_single_part_upload_size>",
"<s3_max_single_part_upload_size>0</s3_max_single_part_upload_size>",
)

View File

@ -5,28 +5,22 @@ import string
import time
import pytest
from helpers.cluster import ClickHouseCluster, get_instances_dir
from helpers.cluster import ClickHouseCluster
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
NOT_RESTORABLE_CONFIG_PATH = os.path.join(
SCRIPT_DIR,
"./{}/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml".format(
get_instances_dir()
),
)
COMMON_CONFIGS = [
"configs/config.d/bg_processing_pool_conf.xml",
"configs/config.d/clusters.xml",
]
def replace_config(old, new):
config = open(NOT_RESTORABLE_CONFIG_PATH, "r")
def replace_config(path, old, new):
config = open(path, "r")
config_lines = config.readlines()
config.close()
config_lines = [line.replace(old, new) for line in config_lines]
config = open(NOT_RESTORABLE_CONFIG_PATH, "w")
config = open(path, "w")
config.writelines(config_lines)
config.close()
@ -507,6 +501,12 @@ def test_restore_mutations(cluster, db_atomic):
def test_migrate_to_restorable_schema(cluster):
db_atomic = True
node = cluster.instances["node_not_restorable"]
config_path = os.path.join(
SCRIPT_DIR,
"./{}/node_not_restorable/configs/config.d/storage_conf_not_restorable.xml".format(
cluster.instances_dir_name
),
)
create_table(node, "test", db_atomic=db_atomic)
uuid = get_table_uuid(node, db_atomic, "test")
@ -525,7 +525,9 @@ def test_migrate_to_restorable_schema(cluster):
)
replace_config(
"<send_metadata>false</send_metadata>", "<send_metadata>true</send_metadata>"
config_path,
"<send_metadata>false</send_metadata>",
"<send_metadata>true</send_metadata>",
)
node.restart_clickhouse()

View File

@ -2,18 +2,13 @@ import os
import time
import pytest
from helpers.cluster import ClickHouseCluster, get_instances_dir
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", main_configs=["configs/max_table_size_to_drop.xml"])
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/max_table_size_to_drop.xml".format(get_instances_dir()),
)
@pytest.fixture(scope="module")
def start_cluster():
@ -29,6 +24,12 @@ def start_cluster():
def test_reload_max_table_size_to_drop(start_cluster):
node.query("INSERT INTO test VALUES (now(), 0)")
config_path = os.path.join(
SCRIPT_DIR,
"./{}/node/configs/config.d/max_table_size_to_drop.xml".format(
start_cluster.instances_dir_name
),
)
time.sleep(5) # wait for data part commit
@ -37,14 +38,14 @@ def test_reload_max_table_size_to_drop(start_cluster):
assert out == ""
assert err != ""
config = open(CONFIG_PATH, "r")
config = open(config_path, "r")
config_lines = config.readlines()
config.close()
config_lines = [
line.replace("<max_table_size_to_drop>1", "<max_table_size_to_drop>1000000")
for line in config_lines
]
config = open(CONFIG_PATH, "w")
config = open(config_path, "w")
config.writelines(config_lines)
config.close()

View File

@ -5,7 +5,7 @@ import string
import time
import pytest
from helpers.cluster import ClickHouseCluster, get_instances_dir
from helpers.cluster import ClickHouseCluster
COMMON_CONFIGS = ["configs/config.d/clusters.xml"]

View File

@ -2590,9 +2590,17 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster):
rabbitmq_exchange_name = 'mv',
rabbitmq_format = 'JSONEachRow',
rabbitmq_queue_base = 'drop_mv';
"""
)
instance.query(
"""
CREATE TABLE test.view (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
"""
)
instance.query(
"""
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.rabbitmq;
"""
@ -2611,6 +2619,14 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster):
exchange="mv", routing_key="", body=json.dumps({"key": i, "value": i})
)
start = time.time()
while time.time() - start < 30:
res = instance.query("SELECT COUNT(*) FROM test.view")
if "20" == res:
break
else:
logging.debug(f"Number of rows in test.view: {res}")
instance.query("DROP VIEW test.consumer")
for i in range(20, 40):
channel.basic_publish(
@ -2643,7 +2659,8 @@ def test_rabbitmq_drop_mv(rabbitmq_cluster):
connection.close()
count = 0
while True:
start = time.time()
while time.time() - start < 30:
count = int(instance.query("SELECT count() FROM test.rabbitmq"))
if count:
break
@ -2685,7 +2702,7 @@ def test_rabbitmq_random_detach(rabbitmq_cluster):
channel = connection.channel()
messages = []
for i in range(messages_num):
for j in range(messages_num):
messages.append(json.dumps({"key": i[0], "value": i[0]}))
i[0] += 1
mes_id = str(i)

View File

@ -9,7 +9,7 @@ import time
import helpers.client
import pytest
from helpers.cluster import ClickHouseCluster, ClickHouseInstance, get_instances_dir
from helpers.cluster import ClickHouseCluster, ClickHouseInstance
from helpers.network import PartitionManager
from helpers.test_tools import exec_query_with_retry
@ -17,11 +17,6 @@ MINIO_INTERNAL_PORT = 9001
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
CONFIG_PATH = os.path.join(
SCRIPT_DIR, "./{}/dummy/configs/config.d/defaultS3.xml".format(get_instances_dir())
)
# Creates S3 bucket for tests and allows anonymous read-write access to it.
def prepare_s3_bucket(started_cluster):
# Allows read-write access for bucket without authorization.
@ -724,17 +719,24 @@ def run_s3_mocks(started_cluster):
logging.info("S3 mocks started")
def replace_config(old, new):
config = open(CONFIG_PATH, "r")
def replace_config(path, old, new):
config = open(path, "r")
config_lines = config.readlines()
config.close()
config_lines = [line.replace(old, new) for line in config_lines]
config = open(CONFIG_PATH, "w")
config = open(path, "w")
config.writelines(config_lines)
config.close()
def test_custom_auth_headers(started_cluster):
config_path = os.path.join(
SCRIPT_DIR,
"./{}/dummy/configs/config.d/defaultS3.xml".format(
started_cluster.instances_dir_name
),
)
table_format = "column1 UInt32, column2 UInt32, column3 UInt32"
filename = "test.csv"
get_query = "select * from s3('http://resolver:8080/{bucket}/{file}', 'CSV', '{table_format}')".format(
@ -758,6 +760,7 @@ def test_custom_auth_headers(started_cluster):
assert run_query(instance, "SELECT * FROM test") == "1\t2\t3\n"
replace_config(
config_path,
"<header>Authorization: Bearer TOKEN",
"<header>Authorization: Bearer INVALID_TOKEN",
)
@ -765,6 +768,7 @@ def test_custom_auth_headers(started_cluster):
ret, err = instance.query_and_get_answer_with_error("SELECT * FROM test")
assert ret == "" and err != ""
replace_config(
config_path,
"<header>Authorization: Bearer INVALID_TOKEN",
"<header>Authorization: Bearer TOKEN",
)
@ -805,10 +809,7 @@ def test_infinite_redirect(started_cluster):
@pytest.mark.parametrize(
"extension,method",
[
pytest.param("bin", "gzip", id="bin"),
pytest.param("gz", "auto", id="gz"),
],
[pytest.param("bin", "gzip", id="bin"), pytest.param("gz", "auto", id="gz")],
)
def test_storage_s3_get_gzip(started_cluster, extension, method):
bucket = started_cluster.minio_bucket