mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
Moving to TestFlows 1.7.20 that has native support for parallel tests.
This commit is contained in:
parent
99929981ab
commit
bd1e2eda47
@ -35,7 +35,7 @@ RUN apt-get update \
|
||||
ENV TZ=Europe/Moscow
|
||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
||||
RUN pip3 install urllib3 testflows==1.6.90 docker-compose==1.29.1 docker==5.0.0 dicttoxml kazoo tzlocal python-dateutil numpy
|
||||
RUN pip3 install urllib3 testflows==1.7.20 docker-compose==1.29.1 docker==5.0.0 dicttoxml kazoo tzlocal python-dateutil numpy
|
||||
|
||||
ENV DOCKER_CHANNEL stable
|
||||
ENV DOCKER_VERSION 20.10.6
|
||||
|
@ -6,7 +6,6 @@ from testflows.core import *
|
||||
append_path(sys.path, "..")
|
||||
|
||||
from helpers.cluster import Cluster
|
||||
from helpers.common import Pool, join, run_scenario
|
||||
from helpers.argparser import argparser
|
||||
from aes_encryption.requirements import *
|
||||
|
||||
@ -52,19 +51,19 @@ xfails = {
|
||||
[(Fail, issue_18250)],
|
||||
"compatibility/mysql/:engine/encrypt/mysql_datatype='VARCHAR(100)'/:":
|
||||
[(Fail, issue_18250)],
|
||||
# reinterpretAsFixedString for UUID stopped working
|
||||
# reinterpretAsFixedString for UUID stopped working
|
||||
"decrypt/decryption/mode=:datatype=UUID:":
|
||||
[(Fail, issue_24029)],
|
||||
[(Fail, issue_24029)],
|
||||
"encrypt/:/mode=:datatype=UUID:":
|
||||
[(Fail, issue_24029)],
|
||||
"decrypt/invalid ciphertext/mode=:/invalid ciphertext=reinterpretAsFixedString(toUUID:":
|
||||
[(Fail, issue_24029)],
|
||||
[(Fail, issue_24029)],
|
||||
"decrypt/invalid ciphertext/mode=:/invalid ciphertext=reinterpretAsFixedString(toUUID:":
|
||||
[(Fail, issue_24029)],
|
||||
"encrypt_mysql/encryption/mode=:datatype=UUID:":
|
||||
[(Fail, issue_24029)],
|
||||
[(Fail, issue_24029)],
|
||||
"decrypt_mysql/decryption/mode=:datatype=UUID:":
|
||||
[(Fail, issue_24029)],
|
||||
[(Fail, issue_24029)],
|
||||
"decrypt_mysql/invalid ciphertext/mode=:/invalid ciphertext=reinterpretAsFixedString(toUUID:":
|
||||
[(Fail, issue_24029)],
|
||||
[(Fail, issue_24029)],
|
||||
}
|
||||
|
||||
@TestFeature
|
||||
@ -76,33 +75,29 @@ xfails = {
|
||||
RQ_SRS008_AES_Functions_DifferentModes("1.0")
|
||||
)
|
||||
@XFails(xfails)
|
||||
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
|
||||
def regression(self, local, clickhouse_binary_path, stress=None):
|
||||
"""ClickHouse AES encryption functions regression module.
|
||||
"""
|
||||
top().terminating = False
|
||||
nodes = {
|
||||
"clickhouse": ("clickhouse1", "clickhouse2", "clickhouse3"),
|
||||
}
|
||||
|
||||
if stress is not None:
|
||||
self.context.stress = stress
|
||||
if parallel is not None:
|
||||
self.context.parallel = parallel
|
||||
|
||||
with Cluster(local, clickhouse_binary_path, nodes=nodes,
|
||||
docker_compose_project_dir=os.path.join(current_dir(), "aes_encryption_env")) as cluster:
|
||||
self.context.cluster = cluster
|
||||
|
||||
tasks = []
|
||||
with Pool(5) as pool:
|
||||
try:
|
||||
run_scenario(pool, tasks, Feature(test=load("aes_encryption.tests.encrypt", "feature"), flags=TE))
|
||||
run_scenario(pool, tasks, Feature(test=load("aes_encryption.tests.decrypt", "feature"), flags=TE))
|
||||
run_scenario(pool, tasks, Feature(test=load("aes_encryption.tests.encrypt_mysql", "feature"), flags=TE))
|
||||
run_scenario(pool, tasks, Feature(test=load("aes_encryption.tests.decrypt_mysql", "feature"), flags=TE))
|
||||
run_scenario(pool, tasks, Feature(test=load("aes_encryption.tests.compatibility.feature", "feature"), flags=TE))
|
||||
Feature(run=load("aes_encryption.tests.encrypt", "feature"), flags=TE, parallel=True, executor=pool)
|
||||
Feature(run=load("aes_encryption.tests.decrypt", "feature"), flags=TE, parallel=True, executor=pool)
|
||||
Feature(run=load("aes_encryption.tests.encrypt_mysql", "feature"), flags=TE, parallel=True, executor=pool)
|
||||
Feature(run=load("aes_encryption.tests.decrypt_mysql", "feature"), flags=TE, parallel=True, executor=pool)
|
||||
Feature(run=load("aes_encryption.tests.compatibility.feature", "feature"), flags=TE, parallel=True, executor=pool)
|
||||
finally:
|
||||
join(tasks)
|
||||
join()
|
||||
|
||||
if main():
|
||||
regression()
|
||||
|
@ -61,33 +61,29 @@ xfails = {
|
||||
RQ_SRS_010_DateTime64_ExtendedRange("1.0"),
|
||||
)
|
||||
@XFails(xfails)
|
||||
def regression(self, local, clickhouse_binary_path, parallel=False, stress=False):
|
||||
def regression(self, local, clickhouse_binary_path, stress=False):
|
||||
"""ClickHouse DateTime64 Extended Range regression module.
|
||||
"""
|
||||
top().terminating = False
|
||||
nodes = {
|
||||
"clickhouse": ("clickhouse1", "clickhouse2", "clickhouse3"),
|
||||
}
|
||||
|
||||
if stress is not None:
|
||||
self.context.stress = stress
|
||||
if parallel is not None:
|
||||
self.context.parallel = parallel
|
||||
|
||||
with Cluster(local, clickhouse_binary_path, nodes=nodes,
|
||||
docker_compose_project_dir=os.path.join(current_dir(), "datetime64_extended_range_env")) as cluster:
|
||||
self.context.cluster = cluster
|
||||
|
||||
tasks = []
|
||||
with Pool(2) as pool:
|
||||
try:
|
||||
run_scenario(pool, tasks, Scenario(test=load("datetime64_extended_range.tests.generic", "generic")))
|
||||
run_scenario(pool, tasks, Scenario(test=load("datetime64_extended_range.tests.non_existent_time", "feature")))
|
||||
run_scenario(pool, tasks, Scenario(test=load("datetime64_extended_range.tests.reference_times", "reference_times")))
|
||||
run_scenario(pool, tasks, Scenario(test=load("datetime64_extended_range.tests.date_time_functions", "date_time_funcs")))
|
||||
run_scenario(pool, tasks, Scenario(test=load("datetime64_extended_range.tests.type_conversion", "type_conversion")))
|
||||
Scenario(run=load("datetime64_extended_range.tests.generic", "generic"), parallel=True, executor=pool)
|
||||
Scenario(run=load("datetime64_extended_range.tests.non_existent_time", "feature"), parallel=True, executor=pool)
|
||||
Scenario(run=load("datetime64_extended_range.tests.reference_times", "reference_times"), parallel=True, executor=pool)
|
||||
Scenario(run=load("datetime64_extended_range.tests.date_time_functions", "date_time_funcs"), parallel=True, executor=pool)
|
||||
Scenario(run=load("datetime64_extended_range.tests.type_conversion", "type_conversion"), parallel=True, executor=pool)
|
||||
finally:
|
||||
join(tasks)
|
||||
join()
|
||||
|
||||
if main():
|
||||
regression()
|
||||
|
@ -128,7 +128,6 @@ def walk_datetime_in_incrementing_steps(self, date, hrs_range=(0, 24), step=1, t
|
||||
stress = self.context.stress
|
||||
secs = f"00{'.' * (precision > 0)}{'0' * precision}"
|
||||
|
||||
tasks = []
|
||||
with Pool(2) as pool:
|
||||
try:
|
||||
with When(f"I loop through datetime range {hrs_range} starting from {date} in {step}min increments"):
|
||||
@ -138,11 +137,11 @@ def walk_datetime_in_incrementing_steps(self, date, hrs_range=(0, 24), step=1, t
|
||||
expected = datetime
|
||||
|
||||
with When(f"time is {datetime}"):
|
||||
run_scenario(pool, tasks, Test(name=f"{hrs}:{mins}:{secs}", test=select_check_datetime),
|
||||
kwargs=dict(datetime=datetime, precision=precision, timezone=timezone,
|
||||
expected=expected))
|
||||
Test(name=f"{hrs}:{mins}:{secs}", test=select_check_datetime, parallel=True, executor=pool)(
|
||||
datetime=datetime, precision=precision, timezone=timezone,
|
||||
expected=expected)
|
||||
finally:
|
||||
join(tasks)
|
||||
join()
|
||||
|
||||
|
||||
@TestStep
|
||||
@ -159,7 +158,6 @@ def walk_datetime_in_decrementing_steps(self, date, hrs_range=(23, 0), step=1, t
|
||||
stress = self.context.stress
|
||||
secs = f"00{'.' * (precision > 0)}{'0' * precision}"
|
||||
|
||||
tasks = []
|
||||
with Pool(2) as pool:
|
||||
try:
|
||||
with When(f"I loop through datetime range {hrs_range} starting from {date} in {step}min decrements"):
|
||||
@ -169,8 +167,8 @@ def walk_datetime_in_decrementing_steps(self, date, hrs_range=(23, 0), step=1, t
|
||||
expected = datetime
|
||||
|
||||
with When(f"time is {datetime}"):
|
||||
run_scenario(pool, tasks, Test(name=f"{hrs}:{mins}:{secs}", test=select_check_datetime),
|
||||
kwargs=dict(datetime=datetime, precision=precision, timezone=timezone,
|
||||
expected=expected))
|
||||
Test(name=f"{hrs}:{mins}:{secs}", test=select_check_datetime, parallel=True, executor=pool)(
|
||||
datetime=datetime, precision=precision, timezone=timezone,
|
||||
expected=expected)
|
||||
finally:
|
||||
join(tasks)
|
||||
join()
|
||||
|
@ -3,7 +3,7 @@ import pytz
|
||||
import itertools
|
||||
from testflows.core import *
|
||||
import dateutil.relativedelta as rd
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
|
||||
from datetime64_extended_range.requirements.requirements import *
|
||||
from datetime64_extended_range.common import *
|
||||
@ -1536,10 +1536,9 @@ def date_time_funcs(self, node="clickhouse1"):
|
||||
"""
|
||||
self.context.node = self.context.cluster.node(node)
|
||||
|
||||
tasks = []
|
||||
with Pool(4) as pool:
|
||||
try:
|
||||
for scenario in loads(current_module(), Scenario):
|
||||
run_scenario(pool, tasks, Scenario(test=scenario))
|
||||
Scenario(run=scenario, parallel=True, executor=pool)
|
||||
finally:
|
||||
join(tasks)
|
||||
join()
|
||||
|
@ -5,7 +5,6 @@ from datetime64_extended_range.common import *
|
||||
from datetime64_extended_range.tests.common import *
|
||||
|
||||
import pytz
|
||||
import datetime
|
||||
import itertools
|
||||
|
||||
@TestScenario
|
||||
|
@ -3,7 +3,6 @@ from datetime import datetime
|
||||
|
||||
from testflows.core import *
|
||||
from datetime64_extended_range.common import *
|
||||
from datetime64_extended_range.tests.common import select_check_datetime
|
||||
from datetime64_extended_range.requirements.requirements import *
|
||||
from datetime64_extended_range.tests.common import *
|
||||
|
||||
|
@ -11,18 +11,15 @@ from helpers.argparser import argparser
|
||||
@TestFeature
|
||||
@Name("example")
|
||||
@ArgumentParser(argparser)
|
||||
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
|
||||
def regression(self, local, clickhouse_binary_path, stress=None):
|
||||
"""Simple example of how you can use TestFlows to test ClickHouse.
|
||||
"""
|
||||
top().terminating = False
|
||||
nodes = {
|
||||
"clickhouse": ("clickhouse1",),
|
||||
}
|
||||
|
||||
if stress is not None:
|
||||
self.context.stress = stress
|
||||
if parallel is not None:
|
||||
self.context.parallel = parallel
|
||||
|
||||
with Cluster(local, clickhouse_binary_path, nodes=nodes,
|
||||
docker_compose_project_dir=os.path.join(current_dir(), "example_env")) as cluster:
|
||||
|
@ -27,25 +27,20 @@ xflags = {
|
||||
@Requirements(
|
||||
RQ_SRS_020_ClickHouse_Extended_Precision("1.0"),
|
||||
)
|
||||
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
|
||||
def regression(self, local, clickhouse_binary_path, stress=None):
|
||||
"""Extended precision data type regression.
|
||||
"""
|
||||
|
||||
top().terminating = False
|
||||
|
||||
nodes = {
|
||||
"clickhouse":
|
||||
("clickhouse1",)
|
||||
}
|
||||
|
||||
with Cluster(local, clickhouse_binary_path, nodes=nodes,
|
||||
docker_compose_project_dir=os.path.join(current_dir(), "extended-precision-data-type_env")) as cluster:
|
||||
|
||||
self.context.cluster = cluster
|
||||
self.context.stress = stress
|
||||
|
||||
if parallel is not None:
|
||||
self.context.parallel = parallel
|
||||
|
||||
Feature(run=load("extended_precision_data_types.tests.feature", "feature"))
|
||||
|
||||
if main():
|
||||
|
@ -1,12 +1,5 @@
|
||||
import os
|
||||
|
||||
def onoff(v):
|
||||
if v in ["yes", "1", "on"]:
|
||||
return True
|
||||
elif v in ["no", "0", "off"]:
|
||||
return False
|
||||
raise ValueError(f"invalid {v}")
|
||||
|
||||
def argparser(parser):
|
||||
"""Default argument parser for regressions.
|
||||
"""
|
||||
@ -21,6 +14,3 @@ def argparser(parser):
|
||||
|
||||
parser.add_argument("--stress", action="store_true", default=False,
|
||||
help="enable stress testing (might take a long time)")
|
||||
|
||||
parser.add_argument("--parallel", type=onoff, default=True, choices=["yes", "no", "on", "off", 0, 1],
|
||||
help="enable parallelism for tests that support it")
|
@ -295,7 +295,7 @@ class Cluster(object):
|
||||
|
||||
self.docker_compose += f" --ansi never --project-directory \"{docker_compose_project_dir}\" --file \"{docker_compose_file_path}\""
|
||||
self.lock = threading.Lock()
|
||||
|
||||
|
||||
@property
|
||||
def control_shell(self, timeout=300):
|
||||
"""Must be called with self.lock.acquired.
|
||||
@ -339,7 +339,7 @@ class Cluster(object):
|
||||
if node is not None:
|
||||
with self.lock:
|
||||
container_id = self.node_container_id(node=node, timeout=timeout)
|
||||
|
||||
|
||||
time_start = time.time()
|
||||
while True:
|
||||
try:
|
||||
@ -367,12 +367,6 @@ class Cluster(object):
|
||||
"""
|
||||
test = current()
|
||||
|
||||
if top().terminating:
|
||||
if test and (test.cflags & MANDATORY and test.subtype is not TestSubType.Given):
|
||||
pass
|
||||
else:
|
||||
raise InterruptedError("terminating")
|
||||
|
||||
current_thread = threading.current_thread()
|
||||
id = f"{current_thread.name}-{node}"
|
||||
|
||||
|
@ -1,15 +1,11 @@
|
||||
import testflows.settings as settings
|
||||
|
||||
from testflows.core import *
|
||||
|
||||
from multiprocessing.dummy import Pool
|
||||
from multiprocessing import TimeoutError as PoolTaskTimeoutError
|
||||
|
||||
@TestStep(Given)
|
||||
def instrument_clickhouse_server_log(self, node=None, test=None,
|
||||
clickhouse_server_log="/var/log/clickhouse-server/clickhouse-server.log"):
|
||||
"""Instrument clickhouse-server.log for the current test (default)
|
||||
by adding start and end messages that include test name to log
|
||||
by adding start and end messages that include test name to log
|
||||
of the specified node. If we are in the debug mode and the test
|
||||
fails then dump the messages from the log for this test.
|
||||
"""
|
||||
@ -29,7 +25,7 @@ def instrument_clickhouse_server_log(self, node=None, test=None,
|
||||
yield
|
||||
|
||||
finally:
|
||||
if top().terminating is True:
|
||||
if test.terminating is True:
|
||||
return
|
||||
|
||||
with Finally("adding test name end message to the clickhouse-server.log", flags=TE):
|
||||
@ -44,65 +40,3 @@ def instrument_clickhouse_server_log(self, node=None, test=None,
|
||||
with Then("dumping clickhouse-server.log for this test"):
|
||||
node.command(f"tail -c +{start_logsize} {clickhouse_server_log}"
|
||||
f" | head -c {int(end_logsize) - int(start_logsize)}")
|
||||
|
||||
def join(tasks, timeout=None, polling=5):
|
||||
"""Join all parallel tests.
|
||||
"""
|
||||
exc = None
|
||||
|
||||
for task in tasks:
|
||||
task._join_timeout = timeout
|
||||
|
||||
while tasks:
|
||||
try:
|
||||
try:
|
||||
tasks[0].get(timeout=polling)
|
||||
tasks.pop(0)
|
||||
|
||||
except PoolTaskTimeoutError as e:
|
||||
task = tasks.pop(0)
|
||||
if task._join_timeout is not None:
|
||||
task._join_timeout -= polling
|
||||
if task._join_timeout <= 0:
|
||||
raise
|
||||
tasks.append(task)
|
||||
continue
|
||||
|
||||
except KeyboardInterrupt as e:
|
||||
top().terminating = True
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
tasks.pop(0)
|
||||
if exc is None:
|
||||
exc = e
|
||||
top().terminating = True
|
||||
|
||||
if exc is not None:
|
||||
raise exc
|
||||
|
||||
def start(pool, tasks, scenario, kwargs=None):
|
||||
"""Start parallel test.
|
||||
"""
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
|
||||
task = pool.apply_async(scenario, [], kwargs)
|
||||
tasks.append(task)
|
||||
|
||||
return task
|
||||
|
||||
def run_scenario(pool, tasks, scenario, kwargs=None):
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
|
||||
_top = top()
|
||||
def _scenario_wrapper(**kwargs):
|
||||
if _top.terminating:
|
||||
return
|
||||
return scenario(**kwargs)
|
||||
|
||||
if current().context.parallel:
|
||||
start(pool, tasks, _scenario_wrapper, kwargs)
|
||||
else:
|
||||
scenario(**kwargs)
|
||||
|
@ -21,10 +21,9 @@ xfails = {
|
||||
RQ_SRS_016_Kerberos("1.0")
|
||||
)
|
||||
@XFails(xfails)
|
||||
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
|
||||
def regression(self, local, clickhouse_binary_path, stress=None):
|
||||
"""ClickHouse Kerberos authentication test regression module.
|
||||
"""
|
||||
top().terminating = False
|
||||
nodes = {
|
||||
"clickhouse": ("clickhouse1", "clickhouse2", "clickhouse3"),
|
||||
"kerberos": ("kerberos", ),
|
||||
@ -32,8 +31,6 @@ def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
|
||||
|
||||
if stress is not None:
|
||||
self.context.stress = stress
|
||||
if parallel is not None:
|
||||
self.context.parallel = parallel
|
||||
|
||||
with Cluster(local, clickhouse_binary_path, nodes=nodes,
|
||||
docker_compose_project_dir=os.path.join(current_dir(), "kerberos_env")) as cluster:
|
||||
@ -43,7 +40,5 @@ def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
|
||||
Feature(run=load("kerberos.tests.config", "config"), flags=TE)
|
||||
Feature(run=load("kerberos.tests.parallel", "parallel"), flags=TE)
|
||||
|
||||
|
||||
|
||||
if main():
|
||||
regression()
|
||||
|
@ -1,12 +1,6 @@
|
||||
from testflows.core import *
|
||||
from kerberos.tests.common import *
|
||||
from kerberos.requirements.requirements import *
|
||||
from multiprocessing.dummy import Pool
|
||||
|
||||
import time
|
||||
import datetime
|
||||
import itertools
|
||||
|
||||
|
||||
@TestScenario
|
||||
@Requirements(
|
||||
@ -28,17 +22,17 @@ def valid_requests_same_credentials(self):
|
||||
return cmd(test_select_query(node=ch_nodes[0]))
|
||||
|
||||
for i in range(15):
|
||||
pool = Pool(2)
|
||||
tasks = []
|
||||
with When("I try simultaneous authentication"):
|
||||
tasks.append(pool.apply_async(helper, (ch_nodes[1].cmd, )))
|
||||
tasks.append(pool.apply_async(helper, (ch_nodes[2].cmd, )))
|
||||
tasks[0].wait(timeout=200)
|
||||
tasks[1].wait(timeout=200)
|
||||
with Pool(2) as pool:
|
||||
with When("I try simultaneous authentication"):
|
||||
tasks.append(pool.submit(helper, (ch_nodes[1].cmd, )))
|
||||
tasks.append(pool.submit(helper, (ch_nodes[2].cmd, )))
|
||||
tasks[0].result(timeout=200)
|
||||
tasks[1].result(timeout=200)
|
||||
|
||||
with Then(f"I expect requests to success"):
|
||||
assert tasks[0].get(timeout=300).output == "kerberos_user", error()
|
||||
assert tasks[1].get(timeout=300).output == "kerberos_user", error()
|
||||
with Then(f"I expect requests to success"):
|
||||
assert tasks[0].result(timeout=300).output == "kerberos_user", error()
|
||||
assert tasks[1].result(timeout=300).output == "kerberos_user", error()
|
||||
|
||||
|
||||
@TestScenario
|
||||
@ -61,26 +55,26 @@ def valid_requests_different_credentials(self):
|
||||
return cmd(test_select_query(node=ch_nodes[0]))
|
||||
|
||||
for i in range(15):
|
||||
pool = Pool(2)
|
||||
|
||||
tasks = []
|
||||
with Pool(2) as pool:
|
||||
with And("add 2 kerberos users via RBAC"):
|
||||
ch_nodes[0].query("CREATE USER krb1 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'")
|
||||
ch_nodes[0].query("CREATE USER krb2 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'")
|
||||
|
||||
with And("add 2 kerberos users via RBAC"):
|
||||
ch_nodes[0].query("CREATE USER krb1 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'")
|
||||
ch_nodes[0].query("CREATE USER krb2 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'")
|
||||
with When("I try simultaneous authentication for valid and invalid"):
|
||||
tasks.append(pool.submit(helper, (ch_nodes[1].cmd, )))
|
||||
tasks.append(pool.submit(helper, (ch_nodes[2].cmd, )))
|
||||
tasks[0].result(timeout=200)
|
||||
tasks[1].result(timeout=200)
|
||||
|
||||
with When("I try simultaneous authentication for valid and invalid"):
|
||||
tasks.append(pool.apply_async(helper, (ch_nodes[1].cmd, )))
|
||||
tasks.append(pool.apply_async(helper, (ch_nodes[2].cmd, )))
|
||||
tasks[0].wait(timeout=200)
|
||||
tasks[1].wait(timeout=200)
|
||||
with Then(f"I expect have auth failure"):
|
||||
assert tasks[1].result(timeout=300).output == "krb2", error()
|
||||
assert tasks[0].result(timeout=300).output == "krb1", error()
|
||||
|
||||
with Then(f"I expect have auth failure"):
|
||||
assert tasks[1].get(timeout=300).output == "krb2", error()
|
||||
assert tasks[0].get(timeout=300).output == "krb1", error()
|
||||
|
||||
with Finally("I make sure both users are removed"):
|
||||
ch_nodes[0].query("DROP USER krb1", no_checks=True)
|
||||
ch_nodes[0].query("DROP USER krb2", no_checks=True)
|
||||
with Finally("I make sure both users are removed"):
|
||||
ch_nodes[0].query("DROP USER krb1", no_checks=True)
|
||||
ch_nodes[0].query("DROP USER krb2", no_checks=True)
|
||||
|
||||
|
||||
@TestScenario
|
||||
@ -103,15 +97,15 @@ def valid_invalid(self):
|
||||
return cmd(test_select_query(node=ch_nodes[0]), no_checks=True)
|
||||
|
||||
for i in range(15):
|
||||
pool = Pool(2)
|
||||
tasks = []
|
||||
with When("I try simultaneous authentication for valid and invalid"):
|
||||
tasks.append(pool.apply_async(helper, (ch_nodes[1].cmd, ))) # invalid
|
||||
tasks.append(pool.apply_async(helper, (ch_nodes[2].cmd, ))) # valid
|
||||
with Pool(2) as pool:
|
||||
with When("I try simultaneous authentication for valid and invalid"):
|
||||
tasks.append(pool.submit(helper, (ch_nodes[1].cmd,))) # invalid
|
||||
tasks.append(pool.submit(helper, (ch_nodes[2].cmd,))) # valid
|
||||
|
||||
with Then(f"I expect have auth failure"):
|
||||
assert tasks[1].get(timeout=300).output == "kerberos_user", error()
|
||||
assert tasks[0].get(timeout=300).output != "kerberos_user", error()
|
||||
with Then(f"I expect have auth failure"):
|
||||
assert tasks[1].result(timeout=300).output == "kerberos_user", error()
|
||||
assert tasks[0].result(timeout=300).output != "kerberos_user", error()
|
||||
|
||||
|
||||
@TestScenario
|
||||
@ -134,28 +128,27 @@ def deletion(self):
|
||||
return cmd(test_select_query(node=ch_nodes[0], req=f"DROP USER {todel}"), no_checks=True)
|
||||
|
||||
for i in range(15):
|
||||
pool = Pool(2)
|
||||
tasks = []
|
||||
|
||||
with And("add 2 kerberos users via RBAC"):
|
||||
ch_nodes[0].query("CREATE USER krb1 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'")
|
||||
ch_nodes[0].query("CREATE USER krb2 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'")
|
||||
ch_nodes[0].query("GRANT ACCESS MANAGEMENT ON *.* TO krb1")
|
||||
ch_nodes[0].query("GRANT ACCESS MANAGEMENT ON *.* TO krb2")
|
||||
with Pool(2) as pool:
|
||||
with And("add 2 kerberos users via RBAC"):
|
||||
ch_nodes[0].query("CREATE USER krb1 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'")
|
||||
ch_nodes[0].query("CREATE USER krb2 IDENTIFIED WITH kerberos REALM 'EXAMPLE.COM'")
|
||||
ch_nodes[0].query("GRANT ACCESS MANAGEMENT ON *.* TO krb1")
|
||||
ch_nodes[0].query("GRANT ACCESS MANAGEMENT ON *.* TO krb2")
|
||||
|
||||
|
||||
with When("I try simultaneous authentication for valid and invalid"):
|
||||
tasks.append(pool.apply_async(helper, (ch_nodes[1].cmd, "krb2")))
|
||||
tasks.append(pool.apply_async(helper, (ch_nodes[2].cmd, "krb1")))
|
||||
tasks[0].wait(timeout=200)
|
||||
tasks[1].wait(timeout=200)
|
||||
with When("I try simultaneous authentication for valid and invalid"):
|
||||
tasks.append(pool.submit(helper, (ch_nodes[1].cmd, "krb2")))
|
||||
tasks.append(pool.submit(helper, (ch_nodes[2].cmd, "krb1")))
|
||||
tasks[0].result(timeout=200)
|
||||
tasks[1].result(timeout=200)
|
||||
|
||||
with Then(f"I check CH is alive"):
|
||||
assert ch_nodes[0].query("SELECT 1").output == "1", error()
|
||||
with Then(f"I check CH is alive"):
|
||||
assert ch_nodes[0].query("SELECT 1").output == "1", error()
|
||||
|
||||
with Finally("I make sure both users are removed"):
|
||||
ch_nodes[0].query("DROP USER krb1", no_checks=True)
|
||||
ch_nodes[0].query("DROP USER krb2", no_checks=True)
|
||||
with Finally("I make sure both users are removed"):
|
||||
ch_nodes[0].query("DROP USER krb1", no_checks=True)
|
||||
ch_nodes[0].query("DROP USER krb2", no_checks=True)
|
||||
|
||||
|
||||
@TestScenario
|
||||
@ -177,15 +170,15 @@ def kerberos_and_nonkerberos(self):
|
||||
return cmd(test_select_query(node=ch_nodes[0], krb_auth=krb_auth), no_checks=True)
|
||||
|
||||
for i in range(15):
|
||||
pool = Pool(2)
|
||||
tasks = []
|
||||
with When("I try simultaneous authentication for valid and invalid"):
|
||||
tasks.append(pool.apply_async(helper, (ch_nodes[1].cmd, False))) # non-kerberos
|
||||
tasks.append(pool.apply_async(helper, (ch_nodes[2].cmd, True))) # kerberos
|
||||
with Pool(2) as pool:
|
||||
with When("I try simultaneous authentication for valid and invalid"):
|
||||
tasks.append(pool.submit(helper, (ch_nodes[1].cmd, False))) # non-kerberos
|
||||
tasks.append(pool.submit(helper, (ch_nodes[2].cmd, True))) # kerberos
|
||||
|
||||
with Then(f"I expect have auth failure"):
|
||||
assert tasks[1].get(timeout=300).output == "kerberos_user", error()
|
||||
assert tasks[0].get(timeout=300).output == "default", error()
|
||||
with Then(f"I expect have auth failure"):
|
||||
assert tasks[1].result(timeout=300).output == "kerberos_user", error()
|
||||
assert tasks[0].result(timeout=300).output == "default", error()
|
||||
|
||||
|
||||
@TestFeature
|
||||
|
@ -109,10 +109,9 @@ xflags = {
|
||||
@Specifications(
|
||||
SRS018_ClickHouse_Map_Data_Type
|
||||
)
|
||||
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
|
||||
def regression(self, local, clickhouse_binary_path, stress=None):
|
||||
"""Map type regression.
|
||||
"""
|
||||
top().terminating = False
|
||||
nodes = {
|
||||
"clickhouse":
|
||||
("clickhouse1", "clickhouse2", "clickhouse3")
|
||||
@ -120,8 +119,6 @@ def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
|
||||
|
||||
if stress is not None:
|
||||
self.context.stress = stress
|
||||
if parallel is not None:
|
||||
self.context.parallel = parallel
|
||||
|
||||
with Cluster(local, clickhouse_binary_path, nodes=nodes,
|
||||
docker_compose_project_dir=os.path.join(current_dir(), "map_type_env")) as cluster:
|
||||
|
@ -4,35 +4,31 @@ from testflows.core import *
|
||||
|
||||
append_path(sys.path, ".")
|
||||
|
||||
from helpers.common import Pool, join, run_scenario
|
||||
from helpers.argparser import argparser
|
||||
|
||||
@TestModule
|
||||
@Name("clickhouse")
|
||||
@ArgumentParser(argparser)
|
||||
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
|
||||
def regression(self, local, clickhouse_binary_path, stress=None):
|
||||
"""ClickHouse regression.
|
||||
"""
|
||||
top().terminating = False
|
||||
args = {"local": local, "clickhouse_binary_path": clickhouse_binary_path, "stress": stress, "parallel": parallel}
|
||||
args = {"local": local, "clickhouse_binary_path": clickhouse_binary_path, "stress": stress}
|
||||
|
||||
self.context.stress = stress
|
||||
self.context.parallel = parallel
|
||||
|
||||
tasks = []
|
||||
with Pool(8) as pool:
|
||||
try:
|
||||
run_scenario(pool, tasks, Feature(test=load("example.regression", "regression")), args)
|
||||
Feature(test=load("example.regression", "regression"), parallel=True, executor=pool)(**args)
|
||||
# run_scenario(pool, tasks, Feature(test=load("ldap.regression", "regression")), args)
|
||||
# run_scenario(pool, tasks, Feature(test=load("rbac.regression", "regression")), args)
|
||||
run_scenario(pool, tasks, Feature(test=load("aes_encryption.regression", "regression")), args)
|
||||
run_scenario(pool, tasks, Feature(test=load("map_type.regression", "regression")), args)
|
||||
run_scenario(pool, tasks, Feature(test=load("window_functions.regression", "regression")), args)
|
||||
run_scenario(pool, tasks, Feature(test=load("datetime64_extended_range.regression", "regression")), args)
|
||||
run_scenario(pool, tasks, Feature(test=load("kerberos.regression", "regression")), args)
|
||||
run_scenario(pool, tasks, Feature(test=load("extended_precision_data_types.regression", "regression")), args)
|
||||
Feature(test=load("aes_encryption.regression", "regression"), parallel=True, executor=pool)(**args)
|
||||
Feature(test=load("map_type.regression", "regression"), parallel=True, executor=pool)(**args)
|
||||
Feature(test=load("window_functions.regression", "regression"), parallel=True, executor=pool)(**args)
|
||||
Feature(test=load("datetime64_extended_range.regression", "regression"), parallel=True, executor=pool)(**args)
|
||||
Feature(test=load("kerberos.regression", "regression"), parallel=True, executor=pool)(**args)
|
||||
Feature(test=load("extended_precision_data_types.regression", "regression"), parallel=True, executor=pool)(**args)
|
||||
finally:
|
||||
join(tasks)
|
||||
join()
|
||||
|
||||
if main():
|
||||
regression()
|
||||
|
@ -91,10 +91,9 @@ xflags = {
|
||||
@Requirements(
|
||||
RQ_SRS_019_ClickHouse_WindowFunctions("1.0")
|
||||
)
|
||||
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
|
||||
def regression(self, local, clickhouse_binary_path, stress=None):
|
||||
"""Window functions regression.
|
||||
"""
|
||||
top().terminating = False
|
||||
nodes = {
|
||||
"clickhouse":
|
||||
("clickhouse1", "clickhouse2", "clickhouse3")
|
||||
@ -102,8 +101,6 @@ def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
|
||||
|
||||
if stress is not None:
|
||||
self.context.stress = stress
|
||||
if parallel is not None:
|
||||
self.context.parallel = parallel
|
||||
|
||||
with Cluster(local, clickhouse_binary_path, nodes=nodes,
|
||||
docker_compose_project_dir=os.path.join(current_dir(), "window_functions_env")) as cluster:
|
||||
|
Loading…
Reference in New Issue
Block a user