ClickHouse/tests/integration/test_distributed_ddl_parallel/test.py
2024-09-27 10:19:49 +00:00

191 lines
4.5 KiB
Python

# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=line-too-long
import threading
import time
from functools import wraps
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
# By default the exceptions that was throwed in threads will be ignored
# (they will not mark the test as failed, only printed to stderr).
#
# Wrap thrading.Thread and re-throw exception on join()
class SafeThread(threading.Thread):
def __init__(self, target):
super().__init__()
self.target = target
self.exception = None
def run(self):
try:
self.target()
except Exception as e: # pylint: disable=broad-except
self.exception = e
def join(self, timeout=None):
super().join(timeout)
if self.exception:
raise self.exception
def add_instance(name, ddl_config=None):
main_configs = [
"configs/remote_servers.xml",
]
if ddl_config:
main_configs.append(ddl_config)
dictionaries = [
"configs/dict.xml",
]
return cluster.add_instance(
name, main_configs=main_configs, dictionaries=dictionaries, with_zookeeper=True
)
initiator = add_instance("initiator")
# distributed_ddl.pool_size = 2
n1 = add_instance("n1", "configs/ddl_a.xml")
n2 = add_instance("n2", "configs/ddl_a.xml")
# distributed_ddl.pool_size = 20
n3 = add_instance("n3", "configs/ddl_b.xml")
n4 = add_instance("n4", "configs/ddl_b.xml")
@pytest.fixture(scope="module", autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
# verifies that functions executes longer then `sec`
def longer_then(sec):
def wrapper(func):
@wraps(func)
def inner(*args, **kwargs):
ts = time.time()
result = func(*args, **kwargs)
te = time.time()
took = te - ts
assert took >= sec
return result
return inner
return wrapper
# It takes 7 seconds to load slow_dict_7.
def execute_reload_dictionary_slow_dict_7():
initiator.query(
"SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_a slow_dict_7",
settings={
"distributed_ddl_task_timeout": 60,
},
)
def execute_reload_dictionary_slow_dict_3():
initiator.query(
"SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_b slow_dict_3",
settings={
"distributed_ddl_task_timeout": 60,
},
)
def execute_smoke_query():
initiator.query(
"DROP DATABASE IF EXISTS foo ON CLUSTER cluster_b",
settings={
"distributed_ddl_task_timeout": 60,
},
)
def check_log():
# ensure that none of tasks processed multiple times
for _, instance in list(cluster.instances.items()):
assert not instance.contains_in_log("Coordination::Exception: Node exists")
# NOTE: uses inner function to exclude slow start_cluster() from timeout.
def test_slow_dict_load_7():
@pytest.mark.timeout(10)
@longer_then(7)
def inner_test():
initiator.query("SYSTEM RELOAD DICTIONARY slow_dict_7")
inner_test()
def test_all_in_parallel():
@pytest.mark.timeout(10)
@longer_then(7)
def inner_test():
threads = []
for _ in range(2):
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
inner_test()
check_log()
def test_two_in_parallel_two_queued():
@pytest.mark.timeout(19)
@longer_then(14)
def inner_test():
threads = []
for _ in range(4):
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
inner_test()
check_log()
def test_smoke():
for _ in range(100):
execute_smoke_query()
check_log()
def test_smoke_parallel():
threads = []
for _ in range(50):
threads.append(SafeThread(target=execute_smoke_query))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
check_log()
def test_smoke_parallel_dict_reload():
threads = []
for _ in range(90):
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_3))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
check_log()