# pylint: disable=unused-argument # pylint: disable=redefined-outer-name # pylint: disable=line-too-long from functools import wraps import threading import time import pytest from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) # By default the exceptions that was throwed in threads will be ignored # (they will not mark the test as failed, only printed to stderr). # # Wrap thrading.Thread and re-throw exception on join() class SafeThread(threading.Thread): def __init__(self, target): super().__init__() self.target = target self.exception = None def run(self): try: self.target() except Exception as e: # pylint: disable=broad-except self.exception = e def join(self, timeout=None): super().join(timeout) if self.exception: raise self.exception def add_instance(name, ddl_config=None): main_configs=[ 'configs/remote_servers.xml', ] if ddl_config: main_configs.append(ddl_config) dictionaries=[ 'configs/dict.xml', ] return cluster.add_instance(name, main_configs=main_configs, dictionaries=dictionaries, with_zookeeper=True) initiator = add_instance('initiator') # distributed_ddl.pool_size = 2 n1 = add_instance('n1', 'configs/ddl_a.xml') n2 = add_instance('n2', 'configs/ddl_a.xml') # distributed_ddl.pool_size = 20 n3 = add_instance('n3', 'configs/ddl_b.xml') n4 = add_instance('n4', 'configs/ddl_b.xml') @pytest.fixture(scope='module', autouse=True) def start_cluster(): try: cluster.start() yield cluster finally: cluster.shutdown() # verifies that functions executes longer then `sec` def longer_then(sec): def wrapper(func): @wraps(func) def inner(*args, **kwargs): ts = time.time() result = func(*args, **kwargs) te = time.time() took = te-ts assert took >= sec return result return inner return wrapper # It takes 7 seconds to load slow_dict_7. def execute_reload_dictionary_slow_dict_7(): initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_a slow_dict_7', settings={ 'distributed_ddl_task_timeout': 60, }) def execute_reload_dictionary_slow_dict_3(): initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_b slow_dict_3', settings={ 'distributed_ddl_task_timeout': 60, }) def execute_smoke_query(): initiator.query('DROP DATABASE IF EXISTS foo ON CLUSTER cluster_b', settings={ 'distributed_ddl_task_timeout': 60, }) def check_log(): # ensure that none of tasks processed multiple times for _, instance in list(cluster.instances.items()): assert not instance.contains_in_log('Coordination::Exception: Node exists') # NOTE: uses inner function to exclude slow start_cluster() from timeout. def test_slow_dict_load_7(): @pytest.mark.timeout(10) @longer_then(7) def inner_test(): initiator.query('SYSTEM RELOAD DICTIONARY slow_dict_7') inner_test() def test_all_in_parallel(): @pytest.mark.timeout(10) @longer_then(7) def inner_test(): threads = [] for _ in range(2): threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7)) for thread in threads: thread.start() for thread in threads: thread.join(70) inner_test() check_log() def test_two_in_parallel_two_queued(): @pytest.mark.timeout(19) @longer_then(14) def inner_test(): threads = [] for _ in range(4): threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7)) for thread in threads: thread.start() for thread in threads: thread.join(70) inner_test() check_log() def test_smoke(): for _ in range(100): execute_smoke_query() check_log() def test_smoke_parallel(): threads = [] for _ in range(100): threads.append(SafeThread(target=execute_smoke_query)) for thread in threads: thread.start() for thread in threads: thread.join(70) check_log() def test_smoke_parallel_dict_reload(): threads = [] for _ in range(100): threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_3)) for thread in threads: thread.start() for thread in threads: thread.join(70) check_log()