mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 17:32:32 +00:00
188 lines
4.5 KiB
Python
188 lines
4.5 KiB
Python
# pylint: disable=unused-argument
|
|
# pylint: disable=redefined-outer-name
|
|
# pylint: disable=line-too-long
|
|
|
|
from functools import wraps
|
|
import threading
|
|
import time
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
# By default the exceptions that was throwed in threads will be ignored
|
|
# (they will not mark the test as failed, only printed to stderr).
|
|
#
|
|
# Wrap thrading.Thread and re-throw exception on join()
|
|
class SafeThread(threading.Thread):
|
|
def __init__(self, target):
|
|
super().__init__()
|
|
self.target = target
|
|
self.exception = None
|
|
|
|
def run(self):
|
|
try:
|
|
self.target()
|
|
except Exception as e: # pylint: disable=broad-except
|
|
self.exception = e
|
|
|
|
def join(self, timeout=None):
|
|
super().join(timeout)
|
|
if self.exception:
|
|
raise self.exception
|
|
|
|
|
|
def add_instance(name, ddl_config=None):
|
|
main_configs = [
|
|
"configs/remote_servers.xml",
|
|
]
|
|
if ddl_config:
|
|
main_configs.append(ddl_config)
|
|
dictionaries = [
|
|
"configs/dict.xml",
|
|
]
|
|
return cluster.add_instance(
|
|
name, main_configs=main_configs, dictionaries=dictionaries, with_zookeeper=True
|
|
)
|
|
|
|
|
|
initiator = add_instance("initiator")
|
|
# distributed_ddl.pool_size = 2
|
|
n1 = add_instance("n1", "configs/ddl_a.xml")
|
|
n2 = add_instance("n2", "configs/ddl_a.xml")
|
|
# distributed_ddl.pool_size = 20
|
|
n3 = add_instance("n3", "configs/ddl_b.xml")
|
|
n4 = add_instance("n4", "configs/ddl_b.xml")
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def start_cluster():
|
|
try:
|
|
cluster.start()
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
# verifies that functions executes longer then `sec`
|
|
def longer_then(sec):
|
|
def wrapper(func):
|
|
@wraps(func)
|
|
def inner(*args, **kwargs):
|
|
ts = time.time()
|
|
result = func(*args, **kwargs)
|
|
te = time.time()
|
|
took = te - ts
|
|
assert took >= sec
|
|
return result
|
|
|
|
return inner
|
|
|
|
return wrapper
|
|
|
|
|
|
# It takes 7 seconds to load slow_dict_7.
|
|
def execute_reload_dictionary_slow_dict_7():
|
|
initiator.query(
|
|
"SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_a slow_dict_7",
|
|
settings={
|
|
"distributed_ddl_task_timeout": 60,
|
|
},
|
|
)
|
|
|
|
|
|
def execute_reload_dictionary_slow_dict_3():
|
|
initiator.query(
|
|
"SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_b slow_dict_3",
|
|
settings={
|
|
"distributed_ddl_task_timeout": 60,
|
|
},
|
|
)
|
|
|
|
|
|
def execute_smoke_query():
|
|
initiator.query(
|
|
"DROP DATABASE IF EXISTS foo ON CLUSTER cluster_b",
|
|
settings={
|
|
"distributed_ddl_task_timeout": 60,
|
|
},
|
|
)
|
|
|
|
|
|
def check_log():
|
|
# ensure that none of tasks processed multiple times
|
|
for _, instance in list(cluster.instances.items()):
|
|
assert not instance.contains_in_log("Coordination::Exception: Node exists")
|
|
|
|
|
|
# NOTE: uses inner function to exclude slow start_cluster() from timeout.
|
|
|
|
|
|
def test_slow_dict_load_7():
|
|
@pytest.mark.timeout(10)
|
|
@longer_then(7)
|
|
def inner_test():
|
|
initiator.query("SYSTEM RELOAD DICTIONARY slow_dict_7")
|
|
|
|
inner_test()
|
|
|
|
|
|
def test_all_in_parallel():
|
|
@pytest.mark.timeout(10)
|
|
@longer_then(7)
|
|
def inner_test():
|
|
threads = []
|
|
for _ in range(2):
|
|
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7))
|
|
for thread in threads:
|
|
thread.start()
|
|
for thread in threads:
|
|
thread.join(70)
|
|
|
|
inner_test()
|
|
check_log()
|
|
|
|
|
|
def test_two_in_parallel_two_queued():
|
|
@pytest.mark.timeout(19)
|
|
@longer_then(14)
|
|
def inner_test():
|
|
threads = []
|
|
for _ in range(4):
|
|
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7))
|
|
for thread in threads:
|
|
thread.start()
|
|
for thread in threads:
|
|
thread.join(70)
|
|
|
|
inner_test()
|
|
check_log()
|
|
|
|
|
|
def test_smoke():
|
|
for _ in range(100):
|
|
execute_smoke_query()
|
|
check_log()
|
|
|
|
|
|
def test_smoke_parallel():
|
|
threads = []
|
|
for _ in range(100):
|
|
threads.append(SafeThread(target=execute_smoke_query))
|
|
for thread in threads:
|
|
thread.start()
|
|
for thread in threads:
|
|
thread.join(70)
|
|
check_log()
|
|
|
|
|
|
def test_smoke_parallel_dict_reload():
|
|
threads = []
|
|
for _ in range(100):
|
|
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_3))
|
|
for thread in threads:
|
|
thread.start()
|
|
for thread in threads:
|
|
thread.join(70)
|
|
check_log()
|