ClickHouse/tests/integration/test_distributed_ddl_parallel/test.py
2021-02-28 05:45:40 +03:00

156 lines
4.4 KiB
Python

# pylint: disable=unused-argument
# pylint: disable=redefined-outer-name
# pylint: disable=line-too-long
from functools import wraps
import threading
import time
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
# By default the exceptions that was throwed in threads will be ignored
# (they will not mark the test as failed, only printed to stderr).
#
# Wrap thrading.Thread and re-throw exception on join()
class SafeThread(threading.Thread):
def __init__(self, target):
super().__init__()
self.target = target
self.exception = None
def run(self):
try:
self.target()
except Exception as e: # pylint: disable=broad-except
self.exception = e
def join(self, timeout=None):
super().join(timeout)
if self.exception:
raise self.exception
def add_instance(name, ddl_config=None):
main_configs=[
'configs/remote_servers.xml',
]
if ddl_config:
main_configs.append(ddl_config)
dictionaries=[
'configs/dict.xml',
]
return cluster.add_instance(name,
main_configs=main_configs,
dictionaries=dictionaries,
with_zookeeper=True)
initiator = add_instance('initiator')
# distributed_ddl.pool_size = 2
n1 = add_instance('n1', 'configs/ddl_a.xml')
n2 = add_instance('n2', 'configs/ddl_a.xml')
# distributed_ddl.pool_size = 20
n3 = add_instance('n3', 'configs/ddl_b.xml')
n4 = add_instance('n4', 'configs/ddl_b.xml')
@pytest.fixture(scope='module', autouse=True)
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
# verifies that functions executes longer then `sec`
def longer_then(sec):
def wrapper(func):
@wraps(func)
def inner(*args, **kwargs):
ts = time.time()
result = func(*args, **kwargs)
te = time.time()
took = te-ts
assert took >= sec
return result
return inner
return wrapper
# It takes 7 seconds to load slow_dict_7.
def execute_reload_dictionary_slow_dict_7():
initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_a slow_dict_7', settings={
'distributed_ddl_task_timeout': 60,
})
def execute_reload_dictionary_slow_dict_3():
initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_b slow_dict_3', settings={
'distributed_ddl_task_timeout': 60,
})
def execute_smoke_query():
initiator.query('DROP DATABASE IF EXISTS foo ON CLUSTER cluster_b', settings={
'distributed_ddl_task_timeout': 60,
})
def check_log():
# ensure that none of tasks processed multiple times
for _, instance in list(cluster.instances.items()):
assert not instance.contains_in_log('Coordination::Exception: Node exists')
# NOTE: uses inner function to exclude slow start_cluster() from timeout.
def test_slow_dict_load_7():
@pytest.mark.timeout(10)
@longer_then(7)
def inner_test():
initiator.query('SYSTEM RELOAD DICTIONARY slow_dict_7')
inner_test()
def test_all_in_parallel():
@pytest.mark.timeout(10)
@longer_then(7)
def inner_test():
threads = []
for _ in range(2):
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
inner_test()
check_log()
def test_two_in_parallel_two_queued():
@pytest.mark.timeout(19)
@longer_then(14)
def inner_test():
threads = []
for _ in range(4):
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
inner_test()
check_log()
def test_smoke():
for _ in range(100):
execute_smoke_query()
check_log()
def test_smoke_parallel():
threads = []
for _ in range(100):
threads.append(SafeThread(target=execute_smoke_query))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
check_log()
def test_smoke_parallel_dict_reload():
threads = []
for _ in range(100):
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_3))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
check_log()