diff --git a/docker/test/testflows/runner/Dockerfile b/docker/test/testflows/runner/Dockerfile index 9fa028fedca..264b98c669d 100644 --- a/docker/test/testflows/runner/Dockerfile +++ b/docker/test/testflows/runner/Dockerfile @@ -35,7 +35,7 @@ RUN apt-get update \ ENV TZ=Europe/Moscow RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone -RUN pip3 install urllib3 testflows==1.6.90 docker-compose==1.29.1 docker==5.0.0 dicttoxml kazoo tzlocal python-dateutil numpy +RUN pip3 install urllib3 testflows==1.7.20 docker-compose==1.29.1 docker==5.0.0 dicttoxml kazoo tzlocal python-dateutil numpy ENV DOCKER_CHANNEL stable ENV DOCKER_VERSION 20.10.6 diff --git a/tests/testflows/aes_encryption/regression.py b/tests/testflows/aes_encryption/regression.py index 60f42ece509..93add36cd1d 100755 --- a/tests/testflows/aes_encryption/regression.py +++ b/tests/testflows/aes_encryption/regression.py @@ -6,7 +6,6 @@ from testflows.core import * append_path(sys.path, "..") from helpers.cluster import Cluster -from helpers.common import Pool, join, run_scenario from helpers.argparser import argparser from aes_encryption.requirements import * @@ -52,19 +51,19 @@ xfails = { [(Fail, issue_18250)], "compatibility/mysql/:engine/encrypt/mysql_datatype='VARCHAR(100)'/:": [(Fail, issue_18250)], - # reinterpretAsFixedString for UUID stopped working + # reinterpretAsFixedString for UUID stopped working "decrypt/decryption/mode=:datatype=UUID:": - [(Fail, issue_24029)], + [(Fail, issue_24029)], "encrypt/:/mode=:datatype=UUID:": - [(Fail, issue_24029)], - "decrypt/invalid ciphertext/mode=:/invalid ciphertext=reinterpretAsFixedString(toUUID:": - [(Fail, issue_24029)], + [(Fail, issue_24029)], + "decrypt/invalid ciphertext/mode=:/invalid ciphertext=reinterpretAsFixedString(toUUID:": + [(Fail, issue_24029)], "encrypt_mysql/encryption/mode=:datatype=UUID:": - [(Fail, issue_24029)], + [(Fail, issue_24029)], "decrypt_mysql/decryption/mode=:datatype=UUID:": - [(Fail, issue_24029)], + [(Fail, issue_24029)], "decrypt_mysql/invalid ciphertext/mode=:/invalid ciphertext=reinterpretAsFixedString(toUUID:": - [(Fail, issue_24029)], + [(Fail, issue_24029)], } @TestFeature @@ -76,33 +75,29 @@ xfails = { RQ_SRS008_AES_Functions_DifferentModes("1.0") ) @XFails(xfails) -def regression(self, local, clickhouse_binary_path, stress=None, parallel=None): +def regression(self, local, clickhouse_binary_path, stress=None): """ClickHouse AES encryption functions regression module. """ - top().terminating = False nodes = { "clickhouse": ("clickhouse1", "clickhouse2", "clickhouse3"), } if stress is not None: self.context.stress = stress - if parallel is not None: - self.context.parallel = parallel with Cluster(local, clickhouse_binary_path, nodes=nodes, docker_compose_project_dir=os.path.join(current_dir(), "aes_encryption_env")) as cluster: self.context.cluster = cluster - tasks = [] with Pool(5) as pool: try: - run_scenario(pool, tasks, Feature(test=load("aes_encryption.tests.encrypt", "feature"), flags=TE)) - run_scenario(pool, tasks, Feature(test=load("aes_encryption.tests.decrypt", "feature"), flags=TE)) - run_scenario(pool, tasks, Feature(test=load("aes_encryption.tests.encrypt_mysql", "feature"), flags=TE)) - run_scenario(pool, tasks, Feature(test=load("aes_encryption.tests.decrypt_mysql", "feature"), flags=TE)) - run_scenario(pool, tasks, Feature(test=load("aes_encryption.tests.compatibility.feature", "feature"), flags=TE)) + Feature(run=load("aes_encryption.tests.encrypt", "feature"), flags=TE, parallel=True, executor=pool) + Feature(run=load("aes_encryption.tests.decrypt", "feature"), flags=TE, parallel=True, executor=pool) + Feature(run=load("aes_encryption.tests.encrypt_mysql", "feature"), flags=TE, parallel=True, executor=pool) + Feature(run=load("aes_encryption.tests.decrypt_mysql", "feature"), flags=TE, parallel=True, executor=pool) + Feature(run=load("aes_encryption.tests.compatibility.feature", "feature"), flags=TE, parallel=True, executor=pool) finally: - join(tasks) + join() if main(): regression() diff --git a/tests/testflows/datetime64_extended_range/regression.py b/tests/testflows/datetime64_extended_range/regression.py index f8db9c74c9e..062c36660ed 100755 --- a/tests/testflows/datetime64_extended_range/regression.py +++ b/tests/testflows/datetime64_extended_range/regression.py @@ -61,33 +61,29 @@ xfails = { RQ_SRS_010_DateTime64_ExtendedRange("1.0"), ) @XFails(xfails) -def regression(self, local, clickhouse_binary_path, parallel=False, stress=False): +def regression(self, local, clickhouse_binary_path, stress=False): """ClickHouse DateTime64 Extended Range regression module. """ - top().terminating = False nodes = { "clickhouse": ("clickhouse1", "clickhouse2", "clickhouse3"), } if stress is not None: self.context.stress = stress - if parallel is not None: - self.context.parallel = parallel with Cluster(local, clickhouse_binary_path, nodes=nodes, docker_compose_project_dir=os.path.join(current_dir(), "datetime64_extended_range_env")) as cluster: self.context.cluster = cluster - tasks = [] with Pool(2) as pool: try: - run_scenario(pool, tasks, Scenario(test=load("datetime64_extended_range.tests.generic", "generic"))) - run_scenario(pool, tasks, Scenario(test=load("datetime64_extended_range.tests.non_existent_time", "feature"))) - run_scenario(pool, tasks, Scenario(test=load("datetime64_extended_range.tests.reference_times", "reference_times"))) - run_scenario(pool, tasks, Scenario(test=load("datetime64_extended_range.tests.date_time_functions", "date_time_funcs"))) - run_scenario(pool, tasks, Scenario(test=load("datetime64_extended_range.tests.type_conversion", "type_conversion"))) + Scenario(run=load("datetime64_extended_range.tests.generic", "generic"), parallel=True, executor=pool) + Scenario(run=load("datetime64_extended_range.tests.non_existent_time", "feature"), parallel=True, executor=pool) + Scenario(run=load("datetime64_extended_range.tests.reference_times", "reference_times"), parallel=True, executor=pool) + Scenario(run=load("datetime64_extended_range.tests.date_time_functions", "date_time_funcs"), parallel=True, executor=pool) + Scenario(run=load("datetime64_extended_range.tests.type_conversion", "type_conversion"), parallel=True, executor=pool) finally: - join(tasks) + join() if main(): regression() diff --git a/tests/testflows/datetime64_extended_range/tests/common.py b/tests/testflows/datetime64_extended_range/tests/common.py index f30381d2f9e..c3bee076bf4 100644 --- a/tests/testflows/datetime64_extended_range/tests/common.py +++ b/tests/testflows/datetime64_extended_range/tests/common.py @@ -128,7 +128,6 @@ def walk_datetime_in_incrementing_steps(self, date, hrs_range=(0, 24), step=1, t stress = self.context.stress secs = f"00{'.' * (precision > 0)}{'0' * precision}" - tasks = [] with Pool(2) as pool: try: with When(f"I loop through datetime range {hrs_range} starting from {date} in {step}min increments"): @@ -138,11 +137,11 @@ def walk_datetime_in_incrementing_steps(self, date, hrs_range=(0, 24), step=1, t expected = datetime with When(f"time is {datetime}"): - run_scenario(pool, tasks, Test(name=f"{hrs}:{mins}:{secs}", test=select_check_datetime), - kwargs=dict(datetime=datetime, precision=precision, timezone=timezone, - expected=expected)) + Test(name=f"{hrs}:{mins}:{secs}", test=select_check_datetime, parallel=True, executor=pool)( + datetime=datetime, precision=precision, timezone=timezone, + expected=expected) finally: - join(tasks) + join() @TestStep @@ -159,7 +158,6 @@ def walk_datetime_in_decrementing_steps(self, date, hrs_range=(23, 0), step=1, t stress = self.context.stress secs = f"00{'.' * (precision > 0)}{'0' * precision}" - tasks = [] with Pool(2) as pool: try: with When(f"I loop through datetime range {hrs_range} starting from {date} in {step}min decrements"): @@ -169,8 +167,8 @@ def walk_datetime_in_decrementing_steps(self, date, hrs_range=(23, 0), step=1, t expected = datetime with When(f"time is {datetime}"): - run_scenario(pool, tasks, Test(name=f"{hrs}:{mins}:{secs}", test=select_check_datetime), - kwargs=dict(datetime=datetime, precision=precision, timezone=timezone, - expected=expected)) + Test(name=f"{hrs}:{mins}:{secs}", test=select_check_datetime, parallel=True, executor=pool)( + datetime=datetime, precision=precision, timezone=timezone, + expected=expected) finally: - join(tasks) + join() diff --git a/tests/testflows/datetime64_extended_range/tests/date_time_functions.py b/tests/testflows/datetime64_extended_range/tests/date_time_functions.py index b0b22f82452..7338f34668a 100644 --- a/tests/testflows/datetime64_extended_range/tests/date_time_functions.py +++ b/tests/testflows/datetime64_extended_range/tests/date_time_functions.py @@ -3,7 +3,7 @@ import pytz import itertools from testflows.core import * import dateutil.relativedelta as rd -from datetime import datetime, timedelta +from datetime import datetime from datetime64_extended_range.requirements.requirements import * from datetime64_extended_range.common import * @@ -1536,10 +1536,9 @@ def date_time_funcs(self, node="clickhouse1"): """ self.context.node = self.context.cluster.node(node) - tasks = [] with Pool(4) as pool: try: for scenario in loads(current_module(), Scenario): - run_scenario(pool, tasks, Scenario(test=scenario)) + Scenario(run=scenario, parallel=True, executor=pool) finally: - join(tasks) + join() diff --git a/tests/testflows/datetime64_extended_range/tests/generic.py b/tests/testflows/datetime64_extended_range/tests/generic.py index 8cb56b99545..6eb117553e0 100644 --- a/tests/testflows/datetime64_extended_range/tests/generic.py +++ b/tests/testflows/datetime64_extended_range/tests/generic.py @@ -5,7 +5,6 @@ from datetime64_extended_range.common import * from datetime64_extended_range.tests.common import * import pytz -import datetime import itertools @TestScenario diff --git a/tests/testflows/datetime64_extended_range/tests/reference_times.py b/tests/testflows/datetime64_extended_range/tests/reference_times.py index 4d4762cc756..cdec3eb260c 100644 --- a/tests/testflows/datetime64_extended_range/tests/reference_times.py +++ b/tests/testflows/datetime64_extended_range/tests/reference_times.py @@ -3,7 +3,6 @@ from datetime import datetime from testflows.core import * from datetime64_extended_range.common import * -from datetime64_extended_range.tests.common import select_check_datetime from datetime64_extended_range.requirements.requirements import * from datetime64_extended_range.tests.common import * diff --git a/tests/testflows/example/regression.py b/tests/testflows/example/regression.py index 4e7ed1025e4..7a0c94a7cd4 100755 --- a/tests/testflows/example/regression.py +++ b/tests/testflows/example/regression.py @@ -11,18 +11,15 @@ from helpers.argparser import argparser @TestFeature @Name("example") @ArgumentParser(argparser) -def regression(self, local, clickhouse_binary_path, stress=None, parallel=None): +def regression(self, local, clickhouse_binary_path, stress=None): """Simple example of how you can use TestFlows to test ClickHouse. """ - top().terminating = False nodes = { "clickhouse": ("clickhouse1",), } if stress is not None: self.context.stress = stress - if parallel is not None: - self.context.parallel = parallel with Cluster(local, clickhouse_binary_path, nodes=nodes, docker_compose_project_dir=os.path.join(current_dir(), "example_env")) as cluster: diff --git a/tests/testflows/extended_precision_data_types/regression.py b/tests/testflows/extended_precision_data_types/regression.py index 8fea6f68e5c..5572381d817 100755 --- a/tests/testflows/extended_precision_data_types/regression.py +++ b/tests/testflows/extended_precision_data_types/regression.py @@ -27,25 +27,20 @@ xflags = { @Requirements( RQ_SRS_020_ClickHouse_Extended_Precision("1.0"), ) -def regression(self, local, clickhouse_binary_path, stress=None, parallel=None): +def regression(self, local, clickhouse_binary_path, stress=None): """Extended precision data type regression. """ - - top().terminating = False - nodes = { "clickhouse": ("clickhouse1",) } + with Cluster(local, clickhouse_binary_path, nodes=nodes, docker_compose_project_dir=os.path.join(current_dir(), "extended-precision-data-type_env")) as cluster: self.context.cluster = cluster self.context.stress = stress - if parallel is not None: - self.context.parallel = parallel - Feature(run=load("extended_precision_data_types.tests.feature", "feature")) if main(): diff --git a/tests/testflows/helpers/argparser.py b/tests/testflows/helpers/argparser.py index 03014becb76..63012601e3b 100644 --- a/tests/testflows/helpers/argparser.py +++ b/tests/testflows/helpers/argparser.py @@ -1,12 +1,5 @@ import os -def onoff(v): - if v in ["yes", "1", "on"]: - return True - elif v in ["no", "0", "off"]: - return False - raise ValueError(f"invalid {v}") - def argparser(parser): """Default argument parser for regressions. """ @@ -21,6 +14,3 @@ def argparser(parser): parser.add_argument("--stress", action="store_true", default=False, help="enable stress testing (might take a long time)") - - parser.add_argument("--parallel", type=onoff, default=True, choices=["yes", "no", "on", "off", 0, 1], - help="enable parallelism for tests that support it") \ No newline at end of file diff --git a/tests/testflows/helpers/cluster.py b/tests/testflows/helpers/cluster.py index 0b704093ff8..5b987c1e376 100755 --- a/tests/testflows/helpers/cluster.py +++ b/tests/testflows/helpers/cluster.py @@ -295,7 +295,7 @@ class Cluster(object): self.docker_compose += f" --ansi never --project-directory \"{docker_compose_project_dir}\" --file \"{docker_compose_file_path}\"" self.lock = threading.Lock() - + @property def control_shell(self, timeout=300): """Must be called with self.lock.acquired. @@ -339,7 +339,7 @@ class Cluster(object): if node is not None: with self.lock: container_id = self.node_container_id(node=node, timeout=timeout) - + time_start = time.time() while True: try: @@ -367,12 +367,6 @@ class Cluster(object): """ test = current() - if top().terminating: - if test and (test.cflags & MANDATORY and test.subtype is not TestSubType.Given): - pass - else: - raise InterruptedError("terminating") - current_thread = threading.current_thread() id = f"{current_thread.name}-{node}" diff --git a/tests/testflows/helpers/common.py b/tests/testflows/helpers/common.py index 2afcc591f98..6110074b137 100644 --- a/tests/testflows/helpers/common.py +++ b/tests/testflows/helpers/common.py @@ -1,15 +1,11 @@ import testflows.settings as settings - from testflows.core import * -from multiprocessing.dummy import Pool -from multiprocessing import TimeoutError as PoolTaskTimeoutError - @TestStep(Given) def instrument_clickhouse_server_log(self, node=None, test=None, clickhouse_server_log="/var/log/clickhouse-server/clickhouse-server.log"): """Instrument clickhouse-server.log for the current test (default) - by adding start and end messages that include test name to log + by adding start and end messages that include test name to log of the specified node. If we are in the debug mode and the test fails then dump the messages from the log for this test. """ @@ -29,7 +25,7 @@ def instrument_clickhouse_server_log(self, node=None, test=None, yield finally: - if top().terminating is True: + if test.terminating is True: return with Finally("adding test name end message to the clickhouse-server.log", flags=TE): @@ -44,65 +40,3 @@ def instrument_clickhouse_server_log(self, node=None, test=None, with Then("dumping clickhouse-server.log for this test"): node.command(f"tail -c +{start_logsize} {clickhouse_server_log}" f" | head -c {int(end_logsize) - int(start_logsize)}") - -def join(tasks, timeout=None, polling=5): - """Join all parallel tests. - """ - exc = None - - for task in tasks: - task._join_timeout = timeout - - while tasks: - try: - try: - tasks[0].get(timeout=polling) - tasks.pop(0) - - except PoolTaskTimeoutError as e: - task = tasks.pop(0) - if task._join_timeout is not None: - task._join_timeout -= polling - if task._join_timeout <= 0: - raise - tasks.append(task) - continue - - except KeyboardInterrupt as e: - top().terminating = True - raise - - except Exception as e: - tasks.pop(0) - if exc is None: - exc = e - top().terminating = True - - if exc is not None: - raise exc - -def start(pool, tasks, scenario, kwargs=None): - """Start parallel test. - """ - if kwargs is None: - kwargs = {} - - task = pool.apply_async(scenario, [], kwargs) - tasks.append(task) - - return task - -def run_scenario(pool, tasks, scenario, kwargs=None): - if kwargs is None: - kwargs = {} - - _top = top() - def _scenario_wrapper(**kwargs): - if _top.terminating: - return - return scenario(**kwargs) - - if current().context.parallel: - start(pool, tasks, _scenario_wrapper, kwargs) - else: - scenario(**kwargs) diff --git a/tests/testflows/kerberos/regression.py b/tests/testflows/kerberos/regression.py index 0e8b0a55c2e..d1b13acc1c9 100755 --- a/tests/testflows/kerberos/regression.py +++ b/tests/testflows/kerberos/regression.py @@ -21,10 +21,9 @@ xfails = { RQ_SRS_016_Kerberos("1.0") ) @XFails(xfails) -def regression(self, local, clickhouse_binary_path, stress=None, parallel=None): +def regression(self, local, clickhouse_binary_path, stress=None): """ClickHouse Kerberos authentication test regression module. """ - top().terminating = False nodes = { "clickhouse": ("clickhouse1", "clickhouse2", "clickhouse3"), "kerberos": ("kerberos", ), @@ -32,8 +31,6 @@ def regression(self, local, clickhouse_binary_path, stress=None, parallel=None): if stress is not None: self.context.stress = stress - if parallel is not None: - self.context.parallel = parallel with Cluster(local, clickhouse_binary_path, nodes=nodes, docker_compose_project_dir=os.path.join(current_dir(), "kerberos_env")) as cluster: @@ -43,7 +40,5 @@ def regression(self, local, clickhouse_binary_path, stress=None, parallel=None): Feature(run=load("kerberos.tests.config", "config"), flags=TE) Feature(run=load("kerberos.tests.parallel", "parallel"), flags=TE) - - if main(): regression() diff --git a/tests/testflows/kerberos/tests/parallel.py b/tests/testflows/kerberos/tests/parallel.py index 694245e524c..5d352af7df4 100644 --- a/tests/testflows/kerberos/tests/parallel.py +++ b/tests/testflows/kerberos/tests/parallel.py @@ -1,12 +1,6 @@ from testflows.core import * from kerberos.tests.common import * from kerberos.requirements.requirements import * -from multiprocessing.dummy import Pool - -import time -import datetime -import itertools - @TestScenario @Requirements( @@ -28,17 +22,17 @@ def valid_requests_same_credentials(self): return cmd(test_select_query(node=ch_nodes[0])) for i in range(15): - pool = Pool(2) tasks = [] - with When("I try simultaneous authentication"): - tasks.append(pool.apply_async(helper, (ch_nodes[1].cmd, ))) - tasks.append(pool.apply_async(helper, (ch_nodes[2].cmd, ))) - tasks[0].wait(timeout=200) - tasks[1].wait(timeout=200) + with Pool(2) as pool: + with When("I try simultaneous authentication"): + tasks.append(pool.submit(helper, (ch_nodes[1].cmd, ))) + tasks.append(pool.submit(helper, (ch_nodes[2].cmd, ))) + tasks[0].result(timeout=200) + tasks[1].result(timeout=200) - with Then(f"I expect requests to success"): - assert tasks[0].get(timeout=300).output == "kerberos_user", error() - assert tasks[1].get(timeout=300).output == "kerberos_user", error() + with Then(f"I expect requests to success"): + assert tasks[0].result(timeout=300).output == "kerberos_user", error() + assert tasks[1].result(timeout=300).output == "kerberos_user", error() @TestScenario @@ -61,26 +55,26 @@ def valid_requests_different_credentials(self): return cmd(test_select_query(node=ch_nodes[0])) for i in range(15): - pool = Pool(2) + tasks = [] + with Pool(2) as pool: + with And("add 2 kerberos users via RBAC"): + ch_nodes[0].query("CREATE USER krb1 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'") + ch_nodes[0].query("CREATE USER krb2 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'") - with And("add 2 kerberos users via RBAC"): - ch_nodes[0].query("CREATE USER krb1 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'") - ch_nodes[0].query("CREATE USER krb2 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'") + with When("I try simultaneous authentication for valid and invalid"): + tasks.append(pool.submit(helper, (ch_nodes[1].cmd, ))) + tasks.append(pool.submit(helper, (ch_nodes[2].cmd, ))) + tasks[0].result(timeout=200) + tasks[1].result(timeout=200) - with When("I try simultaneous authentication for valid and invalid"): - tasks.append(pool.apply_async(helper, (ch_nodes[1].cmd, ))) - tasks.append(pool.apply_async(helper, (ch_nodes[2].cmd, ))) - tasks[0].wait(timeout=200) - tasks[1].wait(timeout=200) + with Then(f"I expect have auth failure"): + assert tasks[1].result(timeout=300).output == "krb2", error() + assert tasks[0].result(timeout=300).output == "krb1", error() - with Then(f"I expect have auth failure"): - assert tasks[1].get(timeout=300).output == "krb2", error() - assert tasks[0].get(timeout=300).output == "krb1", error() - - with Finally("I make sure both users are removed"): - ch_nodes[0].query("DROP USER krb1", no_checks=True) - ch_nodes[0].query("DROP USER krb2", no_checks=True) + with Finally("I make sure both users are removed"): + ch_nodes[0].query("DROP USER krb1", no_checks=True) + ch_nodes[0].query("DROP USER krb2", no_checks=True) @TestScenario @@ -103,15 +97,15 @@ def valid_invalid(self): return cmd(test_select_query(node=ch_nodes[0]), no_checks=True) for i in range(15): - pool = Pool(2) tasks = [] - with When("I try simultaneous authentication for valid and invalid"): - tasks.append(pool.apply_async(helper, (ch_nodes[1].cmd, ))) # invalid - tasks.append(pool.apply_async(helper, (ch_nodes[2].cmd, ))) # valid + with Pool(2) as pool: + with When("I try simultaneous authentication for valid and invalid"): + tasks.append(pool.submit(helper, (ch_nodes[1].cmd,))) # invalid + tasks.append(pool.submit(helper, (ch_nodes[2].cmd,))) # valid - with Then(f"I expect have auth failure"): - assert tasks[1].get(timeout=300).output == "kerberos_user", error() - assert tasks[0].get(timeout=300).output != "kerberos_user", error() + with Then(f"I expect have auth failure"): + assert tasks[1].result(timeout=300).output == "kerberos_user", error() + assert tasks[0].result(timeout=300).output != "kerberos_user", error() @TestScenario @@ -134,28 +128,27 @@ def deletion(self): return cmd(test_select_query(node=ch_nodes[0], req=f"DROP USER {todel}"), no_checks=True) for i in range(15): - pool = Pool(2) tasks = [] - - with And("add 2 kerberos users via RBAC"): - ch_nodes[0].query("CREATE USER krb1 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'") - ch_nodes[0].query("CREATE USER krb2 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'") - ch_nodes[0].query("GRANT ACCESS MANAGEMENT ON *.* TO krb1") - ch_nodes[0].query("GRANT ACCESS MANAGEMENT ON *.* TO krb2") + with Pool(2) as pool: + with And("add 2 kerberos users via RBAC"): + ch_nodes[0].query("CREATE USER krb1 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'") + ch_nodes[0].query("CREATE USER krb2 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'") + ch_nodes[0].query("GRANT ACCESS MANAGEMENT ON *.* TO krb1") + ch_nodes[0].query("GRANT ACCESS MANAGEMENT ON *.* TO krb2") - with When("I try simultaneous authentication for valid and invalid"): - tasks.append(pool.apply_async(helper, (ch_nodes[1].cmd, "krb2"))) - tasks.append(pool.apply_async(helper, (ch_nodes[2].cmd, "krb1"))) - tasks[0].wait(timeout=200) - tasks[1].wait(timeout=200) + with When("I try simultaneous authentication for valid and invalid"): + tasks.append(pool.submit(helper, (ch_nodes[1].cmd, "krb2"))) + tasks.append(pool.submit(helper, (ch_nodes[2].cmd, "krb1"))) + tasks[0].result(timeout=200) + tasks[1].result(timeout=200) - with Then(f"I check CH is alive"): - assert ch_nodes[0].query("SELECT 1").output == "1", error() + with Then(f"I check CH is alive"): + assert ch_nodes[0].query("SELECT 1").output == "1", error() - with Finally("I make sure both users are removed"): - ch_nodes[0].query("DROP USER krb1", no_checks=True) - ch_nodes[0].query("DROP USER krb2", no_checks=True) + with Finally("I make sure both users are removed"): + ch_nodes[0].query("DROP USER krb1", no_checks=True) + ch_nodes[0].query("DROP USER krb2", no_checks=True) @TestScenario @@ -177,15 +170,15 @@ def kerberos_and_nonkerberos(self): return cmd(test_select_query(node=ch_nodes[0], krb_auth=krb_auth), no_checks=True) for i in range(15): - pool = Pool(2) tasks = [] - with When("I try simultaneous authentication for valid and invalid"): - tasks.append(pool.apply_async(helper, (ch_nodes[1].cmd, False))) # non-kerberos - tasks.append(pool.apply_async(helper, (ch_nodes[2].cmd, True))) # kerberos + with Pool(2) as pool: + with When("I try simultaneous authentication for valid and invalid"): + tasks.append(pool.submit(helper, (ch_nodes[1].cmd, False))) # non-kerberos + tasks.append(pool.submit(helper, (ch_nodes[2].cmd, True))) # kerberos - with Then(f"I expect have auth failure"): - assert tasks[1].get(timeout=300).output == "kerberos_user", error() - assert tasks[0].get(timeout=300).output == "default", error() + with Then(f"I expect have auth failure"): + assert tasks[1].result(timeout=300).output == "kerberos_user", error() + assert tasks[0].result(timeout=300).output == "default", error() @TestFeature diff --git a/tests/testflows/map_type/regression.py b/tests/testflows/map_type/regression.py index 9f9c2b2b261..049585dea81 100755 --- a/tests/testflows/map_type/regression.py +++ b/tests/testflows/map_type/regression.py @@ -109,10 +109,9 @@ xflags = { @Specifications( SRS018_ClickHouse_Map_Data_Type ) -def regression(self, local, clickhouse_binary_path, stress=None, parallel=None): +def regression(self, local, clickhouse_binary_path, stress=None): """Map type regression. """ - top().terminating = False nodes = { "clickhouse": ("clickhouse1", "clickhouse2", "clickhouse3") @@ -120,8 +119,6 @@ def regression(self, local, clickhouse_binary_path, stress=None, parallel=None): if stress is not None: self.context.stress = stress - if parallel is not None: - self.context.parallel = parallel with Cluster(local, clickhouse_binary_path, nodes=nodes, docker_compose_project_dir=os.path.join(current_dir(), "map_type_env")) as cluster: diff --git a/tests/testflows/regression.py b/tests/testflows/regression.py index bcdde31e619..ba2ea3b111c 100755 --- a/tests/testflows/regression.py +++ b/tests/testflows/regression.py @@ -4,35 +4,31 @@ from testflows.core import * append_path(sys.path, ".") -from helpers.common import Pool, join, run_scenario from helpers.argparser import argparser @TestModule @Name("clickhouse") @ArgumentParser(argparser) -def regression(self, local, clickhouse_binary_path, stress=None, parallel=None): +def regression(self, local, clickhouse_binary_path, stress=None): """ClickHouse regression. """ - top().terminating = False - args = {"local": local, "clickhouse_binary_path": clickhouse_binary_path, "stress": stress, "parallel": parallel} + args = {"local": local, "clickhouse_binary_path": clickhouse_binary_path, "stress": stress} self.context.stress = stress - self.context.parallel = parallel - tasks = [] with Pool(8) as pool: try: - run_scenario(pool, tasks, Feature(test=load("example.regression", "regression")), args) + Feature(test=load("example.regression", "regression"), parallel=True, executor=pool)(**args) # run_scenario(pool, tasks, Feature(test=load("ldap.regression", "regression")), args) # run_scenario(pool, tasks, Feature(test=load("rbac.regression", "regression")), args) - run_scenario(pool, tasks, Feature(test=load("aes_encryption.regression", "regression")), args) - run_scenario(pool, tasks, Feature(test=load("map_type.regression", "regression")), args) - run_scenario(pool, tasks, Feature(test=load("window_functions.regression", "regression")), args) - run_scenario(pool, tasks, Feature(test=load("datetime64_extended_range.regression", "regression")), args) - run_scenario(pool, tasks, Feature(test=load("kerberos.regression", "regression")), args) - run_scenario(pool, tasks, Feature(test=load("extended_precision_data_types.regression", "regression")), args) + Feature(test=load("aes_encryption.regression", "regression"), parallel=True, executor=pool)(**args) + Feature(test=load("map_type.regression", "regression"), parallel=True, executor=pool)(**args) + Feature(test=load("window_functions.regression", "regression"), parallel=True, executor=pool)(**args) + Feature(test=load("datetime64_extended_range.regression", "regression"), parallel=True, executor=pool)(**args) + Feature(test=load("kerberos.regression", "regression"), parallel=True, executor=pool)(**args) + Feature(test=load("extended_precision_data_types.regression", "regression"), parallel=True, executor=pool)(**args) finally: - join(tasks) + join() if main(): regression() diff --git a/tests/testflows/window_functions/regression.py b/tests/testflows/window_functions/regression.py index 778a829082f..2c70fc1d075 100755 --- a/tests/testflows/window_functions/regression.py +++ b/tests/testflows/window_functions/regression.py @@ -91,10 +91,9 @@ xflags = { @Requirements( RQ_SRS_019_ClickHouse_WindowFunctions("1.0") ) -def regression(self, local, clickhouse_binary_path, stress=None, parallel=None): +def regression(self, local, clickhouse_binary_path, stress=None): """Window functions regression. """ - top().terminating = False nodes = { "clickhouse": ("clickhouse1", "clickhouse2", "clickhouse3") @@ -102,8 +101,6 @@ def regression(self, local, clickhouse_binary_path, stress=None, parallel=None): if stress is not None: self.context.stress = stress - if parallel is not None: - self.context.parallel = parallel with Cluster(local, clickhouse_binary_path, nodes=nodes, docker_compose_project_dir=os.path.join(current_dir(), "window_functions_env")) as cluster: