mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-10 00:12:27 +00:00
d42d4cfd6b
Refs: #21264
156 lines
4.4 KiB
Python
156 lines
4.4 KiB
Python
# pylint: disable=unused-argument
|
|
# pylint: disable=redefined-outer-name
|
|
# pylint: disable=line-too-long
|
|
|
|
from functools import wraps
|
|
import threading
|
|
import time
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
|
|
# By default the exceptions that was throwed in threads will be ignored
|
|
# (they will not mark the test as failed, only printed to stderr).
|
|
#
|
|
# Wrap thrading.Thread and re-throw exception on join()
|
|
class SafeThread(threading.Thread):
|
|
def __init__(self, target):
|
|
super().__init__()
|
|
self.target = target
|
|
self.exception = None
|
|
def run(self):
|
|
try:
|
|
self.target()
|
|
except Exception as e: # pylint: disable=broad-except
|
|
self.exception = e
|
|
def join(self, timeout=None):
|
|
super().join(timeout)
|
|
if self.exception:
|
|
raise self.exception
|
|
|
|
def add_instance(name, ddl_config=None):
|
|
main_configs=[
|
|
'configs/remote_servers.xml',
|
|
]
|
|
if ddl_config:
|
|
main_configs.append(ddl_config)
|
|
dictionaries=[
|
|
'configs/dict.xml',
|
|
]
|
|
return cluster.add_instance(name,
|
|
main_configs=main_configs,
|
|
dictionaries=dictionaries,
|
|
with_zookeeper=True)
|
|
|
|
initiator = add_instance('initiator')
|
|
# distributed_ddl.pool_size = 2
|
|
n1 = add_instance('n1', 'configs/ddl_a.xml')
|
|
n2 = add_instance('n2', 'configs/ddl_a.xml')
|
|
# distributed_ddl.pool_size = 20
|
|
n3 = add_instance('n3', 'configs/ddl_b.xml')
|
|
n4 = add_instance('n4', 'configs/ddl_b.xml')
|
|
|
|
@pytest.fixture(scope='module', autouse=True)
|
|
def start_cluster():
|
|
try:
|
|
cluster.start()
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
# verifies that functions executes longer then `sec`
|
|
def longer_then(sec):
|
|
def wrapper(func):
|
|
@wraps(func)
|
|
def inner(*args, **kwargs):
|
|
ts = time.time()
|
|
result = func(*args, **kwargs)
|
|
te = time.time()
|
|
took = te-ts
|
|
assert took >= sec
|
|
return result
|
|
return inner
|
|
return wrapper
|
|
|
|
# It takes 7 seconds to load slow_dict_7.
|
|
def execute_reload_dictionary_slow_dict_7():
|
|
initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_a slow_dict_7', settings={
|
|
'distributed_ddl_task_timeout': 60,
|
|
})
|
|
def execute_reload_dictionary_slow_dict_3():
|
|
initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_b slow_dict_3', settings={
|
|
'distributed_ddl_task_timeout': 60,
|
|
})
|
|
def execute_smoke_query():
|
|
initiator.query('DROP DATABASE IF EXISTS foo ON CLUSTER cluster_b', settings={
|
|
'distributed_ddl_task_timeout': 60,
|
|
})
|
|
|
|
def check_log():
|
|
# ensure that none of tasks processed multiple times
|
|
for _, instance in list(cluster.instances.items()):
|
|
assert not instance.contains_in_log('Coordination::Exception: Node exists')
|
|
|
|
# NOTE: uses inner function to exclude slow start_cluster() from timeout.
|
|
|
|
def test_slow_dict_load_7():
|
|
@pytest.mark.timeout(10)
|
|
@longer_then(7)
|
|
def inner_test():
|
|
initiator.query('SYSTEM RELOAD DICTIONARY slow_dict_7')
|
|
inner_test()
|
|
|
|
def test_all_in_parallel():
|
|
@pytest.mark.timeout(10)
|
|
@longer_then(7)
|
|
def inner_test():
|
|
threads = []
|
|
for _ in range(2):
|
|
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7))
|
|
for thread in threads:
|
|
thread.start()
|
|
for thread in threads:
|
|
thread.join(70)
|
|
inner_test()
|
|
check_log()
|
|
|
|
def test_two_in_parallel_two_queued():
|
|
@pytest.mark.timeout(19)
|
|
@longer_then(14)
|
|
def inner_test():
|
|
threads = []
|
|
for _ in range(4):
|
|
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7))
|
|
for thread in threads:
|
|
thread.start()
|
|
for thread in threads:
|
|
thread.join(70)
|
|
inner_test()
|
|
check_log()
|
|
|
|
def test_smoke():
|
|
for _ in range(100):
|
|
execute_smoke_query()
|
|
check_log()
|
|
|
|
def test_smoke_parallel():
|
|
threads = []
|
|
for _ in range(100):
|
|
threads.append(SafeThread(target=execute_smoke_query))
|
|
for thread in threads:
|
|
thread.start()
|
|
for thread in threads:
|
|
thread.join(70)
|
|
check_log()
|
|
|
|
def test_smoke_parallel_dict_reload():
|
|
threads = []
|
|
for _ in range(100):
|
|
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_3))
|
|
for thread in threads:
|
|
thread.start()
|
|
for thread in threads:
|
|
thread.join(70)
|
|
check_log()
|