From 9c7f3a9a742fb9b96c176b22b85f4d0a9e8a306c Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sat, 12 Sep 2020 02:33:17 +0300 Subject: [PATCH] Add test_distributed_ddl_parallel --- .../test_distributed_ddl_parallel/__init__.py | 0 .../configs/ddl.xml | 5 ++ .../configs/dict.xml | 26 ++++++ .../configs/remote_servers.xml | 18 ++++ .../test_distributed_ddl_parallel/test.py | 89 +++++++++++++++++++ 5 files changed, 138 insertions(+) create mode 100644 tests/integration/test_distributed_ddl_parallel/__init__.py create mode 100644 tests/integration/test_distributed_ddl_parallel/configs/ddl.xml create mode 100644 tests/integration/test_distributed_ddl_parallel/configs/dict.xml create mode 100644 tests/integration/test_distributed_ddl_parallel/configs/remote_servers.xml create mode 100644 tests/integration/test_distributed_ddl_parallel/test.py diff --git a/tests/integration/test_distributed_ddl_parallel/__init__.py b/tests/integration/test_distributed_ddl_parallel/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_distributed_ddl_parallel/configs/ddl.xml b/tests/integration/test_distributed_ddl_parallel/configs/ddl.xml new file mode 100644 index 00000000000..b926f99c687 --- /dev/null +++ b/tests/integration/test_distributed_ddl_parallel/configs/ddl.xml @@ -0,0 +1,5 @@ + + + 2 + + diff --git a/tests/integration/test_distributed_ddl_parallel/configs/dict.xml b/tests/integration/test_distributed_ddl_parallel/configs/dict.xml new file mode 100644 index 00000000000..610d55841a0 --- /dev/null +++ b/tests/integration/test_distributed_ddl_parallel/configs/dict.xml @@ -0,0 +1,26 @@ + + + + slow_dict + + + sleep 7 + TabSeparated + + + + + + + + id + + + value + String + + + + 0 + + diff --git a/tests/integration/test_distributed_ddl_parallel/configs/remote_servers.xml b/tests/integration/test_distributed_ddl_parallel/configs/remote_servers.xml new file mode 100644 index 00000000000..8ffa9f024d7 --- /dev/null +++ b/tests/integration/test_distributed_ddl_parallel/configs/remote_servers.xml @@ -0,0 +1,18 @@ + + + + + + n1 + 9000 + + + + + n2 + 9000 + + + + + diff --git a/tests/integration/test_distributed_ddl_parallel/test.py b/tests/integration/test_distributed_ddl_parallel/test.py new file mode 100644 index 00000000000..96530b111cb --- /dev/null +++ b/tests/integration/test_distributed_ddl_parallel/test.py @@ -0,0 +1,89 @@ +# pylint: disable=unused-argument +# pylint: disable=redefined-outer-name +# pylint: disable=line-too-long + +from functools import wraps +import threading +import time +import pytest +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) + +def add_instance(name): + main_configs=[ + 'configs/ddl.xml', + 'configs/remote_servers.xml', + ] + dictionaries=[ + 'configs/dict.xml', + ] + return cluster.add_instance(name, + main_configs=main_configs, + dictionaries=dictionaries, + with_zookeeper=True) + +initiator = add_instance('initiator') +n1 = add_instance('n1') +n2 = add_instance('n2') + +@pytest.fixture(scope='module', autouse=True) +def start_cluster(): + try: + cluster.start() + yield cluster + finally: + cluster.shutdown() + +# verifies that functions executes longer then `sec` +def longer_then(sec): + def wrapper(func): + @wraps(func) + def inner(*args, **kwargs): + ts = time.time() + result = func(*args, **kwargs) + te = time.time() + took = te-ts + assert took >= sec + return result + return inner + return wrapper + +# It takes 7 seconds to load slow_dict. +def thread_reload_dictionary(): + initiator.query('SYSTEM RELOAD DICTIONARY ON CLUSTER cluster slow_dict') + +# NOTE: uses inner function to exclude slow start_cluster() from timeout. + +def test_dict_load(): + @pytest.mark.timeout(10) + @longer_then(7) + def inner_test(): + initiator.query('SYSTEM RELOAD DICTIONARY slow_dict') + inner_test() + +def test_all_in_parallel(): + @pytest.mark.timeout(10) + @longer_then(7) + def inner_test(): + threads = [] + for _ in range(2): + threads.append(threading.Thread(target=thread_reload_dictionary)) + for thread in threads: + thread.start() + for thread in threads: + thread.join() + inner_test() + +def test_two_in_parallel_two_queued(): + @pytest.mark.timeout(19) + @longer_then(14) + def inner_test(): + threads = [] + for _ in range(4): + threads.append(threading.Thread(target=thread_reload_dictionary)) + for thread in threads: + thread.start() + for thread in threads: + thread.join() + inner_test()