Improve test_distributed_ddl_parallel to cover more cases

Refs: #21264
This commit is contained in:
Azat Khuzhin 2021-02-24 21:22:36 +03:00
parent 7d51ae3212
commit d42d4cfd6b
5 changed files with 126 additions and 39 deletions

View File

@ -0,0 +1,5 @@
<yandex>
<distributed_ddl>
<pool_size replace="1">2</pool_size>
</distributed_ddl>
</yandex>

View File

@ -0,0 +1,5 @@
<yandex>
<distributed_ddl>
<pool_size replace="1">20</pool_size>
</distributed_ddl>
</yandex>

View File

@ -1,26 +1,50 @@
<?xml version="1.0"?>
<yandex>
<dictionary>
<name>slow_dict</name>
<source>
<executable>
<command>sleep 7</command>
<format>TabSeparated</format>
</executable>
</source>
<layout>
<flat/>
</layout>
<structure>
<id>
<name>id</name>
</id>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>0</lifetime>
</dictionary>
<dictionary>
<name>slow_dict_7</name>
<source>
<executable>
<command>sleep 7</command>
<format>TabSeparated</format>
</executable>
</source>
<layout>
<flat/>
</layout>
<structure>
<id>
<name>id</name>
</id>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>0</lifetime>
</dictionary>
<dictionary>
<name>slow_dict_3</name>
<source>
<executable>
<command>sleep 3</command>
<format>TabSeparated</format>
</executable>
</source>
<layout>
<flat/>
</layout>
<structure>
<id>
<name>id</name>
</id>
<attribute>
<name>value</name>
<type>String</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>0</lifetime>
</dictionary>
</yandex>

View File

@ -1,6 +1,6 @@
<yandex>
<remote_servers>
<cluster>
<cluster_a>
<shard>
<replica>
<host>n1</host>
@ -13,6 +13,20 @@
<port>9000</port>
</replica>
</shard>
</cluster>
</cluster_a>
<cluster_b>
<shard>
<replica>
<host>n3</host>
<port>9000</port>
</replica>
</shard>
<shard>
<replica>
<host>n4</host>
<port>9000</port>
</replica>
</shard>
</cluster_b>
</remote_servers>
</yandex>

View File

@ -29,11 +29,12 @@ class SafeThread(threading.Thread):
if self.exception:
raise self.exception
def add_instance(name):
def add_instance(name, ddl_config=None):
main_configs=[
'configs/ddl.xml',
'configs/remote_servers.xml',
]
if ddl_config:
main_configs.append(ddl_config)
dictionaries=[
'configs/dict.xml',
]
@ -43,8 +44,12 @@ def add_instance(name):
with_zookeeper=True)
initiator = add_instance('initiator')
n1 = add_instance('n1')
n2 = add_instance('n2')
# distributed_ddl.pool_size = 2
n1 = add_instance('n1', 'configs/ddl_a.xml')
n2 = add_instance('n2', 'configs/ddl_a.xml')
# distributed_ddl.pool_size = 20
n3 = add_instance('n3', 'configs/ddl_b.xml')
n4 = add_instance('n4', 'configs/ddl_b.xml')
@pytest.fixture(scope='module', autouse=True)
def start_cluster():
@ -68,19 +73,32 @@ def longer_then(sec):
return inner
return wrapper
# It takes 7 seconds to load slow_dict.
def thread_reload_dictionary():
initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster slow_dict', settings={
# It takes 7 seconds to load slow_dict_7.
def execute_reload_dictionary_slow_dict_7():
initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_a slow_dict_7', settings={
'distributed_ddl_task_timeout': 60,
})
def execute_reload_dictionary_slow_dict_3():
initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster_b slow_dict_3', settings={
'distributed_ddl_task_timeout': 60,
})
def execute_smoke_query():
initiator.query('DROP DATABASE IF EXISTS foo ON CLUSTER cluster_b', settings={
'distributed_ddl_task_timeout': 60,
})
def check_log():
# ensure that none of tasks processed multiple times
for _, instance in list(cluster.instances.items()):
assert not instance.contains_in_log('Coordination::Exception: Node exists')
# NOTE: uses inner function to exclude slow start_cluster() from timeout.
def test_dict_load():
def test_slow_dict_load_7():
@pytest.mark.timeout(10)
@longer_then(7)
def inner_test():
initiator.query('SYSTEM RELOAD DICTIONARY slow_dict')
initiator.query('SYSTEM RELOAD DICTIONARY slow_dict_7')
inner_test()
def test_all_in_parallel():
@ -89,12 +107,13 @@ def test_all_in_parallel():
def inner_test():
threads = []
for _ in range(2):
threads.append(SafeThread(target=thread_reload_dictionary))
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
inner_test()
check_log()
def test_two_in_parallel_two_queued():
@pytest.mark.timeout(19)
@ -102,15 +121,35 @@ def test_two_in_parallel_two_queued():
def inner_test():
threads = []
for _ in range(4):
threads.append(SafeThread(target=thread_reload_dictionary))
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_7))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
inner_test()
check_log()
def test_smoke():
for _ in range(100):
initiator.query('DROP DATABASE IF EXISTS foo ON CLUSTER cluster', settings={
'distributed_ddl_task_timeout': 60,
})
execute_smoke_query()
check_log()
def test_smoke_parallel():
threads = []
for _ in range(100):
threads.append(SafeThread(target=execute_smoke_query))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
check_log()
def test_smoke_parallel_dict_reload():
threads = []
for _ in range(100):
threads.append(SafeThread(target=execute_reload_dictionary_slow_dict_3))
for thread in threads:
thread.start()
for thread in threads:
thread.join(70)
check_log()