Adding support to run modules in parallel.

Adding fixes to ldap and rbac tests.
Adding user_dn_detection tests in LDAP role mapping.
This commit is contained in:
Vitaliy Zakaznikov 2021-05-07 09:14:40 -04:00
parent 617e71b3f2
commit 0b95bfb38e
74 changed files with 976 additions and 523 deletions

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python3
import os
import sys
from testflows.core import *
@ -63,11 +64,13 @@ xfails = {
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
"""ClickHouse AES encryption functions regression module.
"""
top().terminating = False
nodes = {
"clickhouse": ("clickhouse1", "clickhouse2", "clickhouse3"),
}
with Cluster(local, clickhouse_binary_path, nodes=nodes) as cluster:
with Cluster(local, clickhouse_binary_path, nodes=nodes,
docker_compose_project_dir=os.path.join(current_dir(), "aes_encryption_env")) as cluster:
self.context.cluster = cluster
Feature(run=load("aes_encryption.tests.encrypt", "feature"), flags=TE)

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python3
import os
import sys
from testflows.core import *
@ -13,11 +14,13 @@ from helpers.argparser import argparser
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
"""Simple example of how you can use TestFlows to test ClickHouse.
"""
top().terminating = False
nodes = {
"clickhouse": ("clickhouse1",),
}
with Cluster(local, clickhouse_binary_path, nodes=nodes) as cluster:
with Cluster(local, clickhouse_binary_path, nodes=nodes,
docker_compose_project_dir=os.path.join(current_dir(), "example_env")) as cluster:
self.context.cluster = cluster
Scenario(run=load("example.tests.example", "scenario"))

View File

@ -4,10 +4,31 @@ import inspect
import threading
import tempfile
import testflows.settings as settings
from testflows.core import *
from testflows.asserts import error
from testflows.connect import Shell
from testflows.connect import Shell as ShellBase
from testflows.uexpect import ExpectTimeoutError
from testflows._core.testtype import TestSubType
class Shell(ShellBase):
def __exit__(self, type, value, traceback):
# send exit and Ctrl-D repeatedly
# to terminate any open shell commands.
# This is needed for example
# to solve a problem with
# 'docker-compose exec {name} bash --noediting'
# that does not clean up open bash processes
# if not exited normally
for i in range(10):
if self.child is not None:
try:
self.send('exit\r', eol='')
self.send('\x04\r', eol='')
except OSError:
pass
return super(Shell, self).__exit__(type, value, traceback)
class QueryRuntimeException(Exception):
"""Exception during query execution on the server.
@ -26,8 +47,8 @@ class Node(object):
def repr(self):
return f"Node(name='{self.name}')"
def restart(self, timeout=300, retries=5, safe=True):
"""Restart node.
def close_bashes(self):
"""Close all active bashes to the node.
"""
with self.cluster.lock:
for key in list(self.cluster._bash.keys()):
@ -35,6 +56,11 @@ class Node(object):
shell = self.cluster._bash.pop(key)
shell.__exit__(None, None, None)
def restart(self, timeout=300, retries=5, safe=True):
"""Restart node.
"""
self.close_bashes()
for retry in range(retries):
r = self.cluster.command(None, f'{self.cluster.docker_compose} restart {self.name}', timeout=timeout)
if r.exitcode == 0:
@ -51,11 +77,7 @@ class Node(object):
def stop(self, timeout=300, retries=5, safe=True):
"""Stop node.
"""
with self.cluster.lock:
for key in list(self.cluster._bash.keys()):
if key.endswith(f"-{self.name}"):
shell = self.cluster._bash.pop(key)
shell.__exit__(None, None, None)
self.close_bashes()
for retry in range(retries):
r = self.cluster.command(None, f'{self.cluster.docker_compose} stop {self.name}', timeout=timeout)
@ -110,7 +132,7 @@ class ClickHouseNode(Node):
with By(f"waiting until container {self.name} is healthy"):
start_time = time.time()
while True:
if self.query("select 1", no_checks=1, timeout=300, steps=False).exitcode == 0:
if self.query("select 1", no_checks=1, timeout=10, steps=False).exitcode == 0:
break
if time.time() - start_time < timeout:
time.sleep(2)
@ -129,11 +151,7 @@ class ClickHouseNode(Node):
with And("forcing to sync everything to disk"):
self.command("sync", timeout=30)
with self.cluster.lock:
for key in list(self.cluster._bash.keys()):
if key.endswith(f"-{self.name}"):
shell = self.cluster._bash.pop(key)
shell.__exit__(None, None, None)
self.close_bashes()
for retry in range(retries):
r = self.cluster.command(None, f'{self.cluster.docker_compose} stop {self.name}', timeout=timeout)
@ -163,11 +181,7 @@ class ClickHouseNode(Node):
with And("forcing to sync everything to disk"):
self.command("sync", timeout=30)
with self.cluster.lock:
for key in list(self.cluster._bash.keys()):
if key.endswith(f"-{self.name}"):
shell = self.cluster._bash.pop(key)
shell.__exit__(None, None, None)
self.close_bashes()
for retry in range(retries):
r = self.cluster.command(None, f'{self.cluster.docker_compose} restart {self.name}', timeout=timeout)
@ -247,8 +261,8 @@ class Cluster(object):
docker_compose="docker-compose", docker_compose_project_dir=None,
docker_compose_file="docker-compose.yml"):
self.terminating = False
self._bash = {}
self.environ = {}
self.clickhouse_binary_path = clickhouse_binary_path
self.configs_dir = configs_dir
self.local = local
@ -278,7 +292,7 @@ class Cluster(object):
if not os.path.exists(docker_compose_file_path):
raise TypeError("docker compose file '{docker_compose_file_path}' does not exist")
self.docker_compose += f" --no-ansi --project-directory \"{docker_compose_project_dir}\" --file \"{docker_compose_file_path}\""
self.docker_compose += f" --ansi never --project-directory \"{docker_compose_project_dir}\" --file \"{docker_compose_file_path}\""
self.lock = threading.Lock()
def shell(self, node, timeout=300):
@ -288,7 +302,7 @@ class Cluster(object):
return Shell()
shell = Shell(command=[
"/bin/bash", "--noediting", "-c", f"{self.docker_compose} exec {node} bash --noediting"
"/bin/bash", "--noediting", "-c", f"docker exec -it $({self.docker_compose} ps -q {node}) bash --noediting"
], name=node)
shell.timeout = timeout
@ -301,8 +315,8 @@ class Cluster(object):
"""
test = current()
if self.terminating:
if test and (test.cflags & MANDATORY):
if top().terminating:
if test and (test.cflags & MANDATORY and test.subtype is not TestSubType.Given):
pass
else:
raise InterruptedError("terminating")
@ -314,9 +328,11 @@ class Cluster(object):
if self._bash.get(id) is None:
if node is None:
self._bash[id] = Shell().__enter__()
for name,value in self.environ.items():
self._bash[id](f"export {name}={value}")
else:
self._bash[id] = Shell(command=[
"/bin/bash", "--noediting", "-c", f"{self.docker_compose} exec {node} {command}"
"/bin/bash", "--noediting", "-c", f"docker exec -it $({self.docker_compose} ps -q {node}) {command}"
], name=node).__enter__()
self._bash[id].timeout = timeout
@ -366,8 +382,11 @@ class Cluster(object):
def down(self, timeout=300):
"""Bring cluster down by executing docker-compose down."""
self.terminating = True
# add message to each clickhouse-server.log
if settings.debug:
for node in self.nodes["clickhouse"]:
self.command(node=node, command=f"echo -e \"\n-- sending stop to: {node} --\n\" >> /var/log/clickhouse-server/clickhouse-server.log")
try:
bash = self.bash(None)
with self.lock:
@ -390,39 +409,54 @@ class Cluster(object):
assert os.path.exists(self.clickhouse_binary_path)
with And("I set all the necessary environment variables"):
os.environ["COMPOSE_HTTP_TIMEOUT"] = "300"
os.environ["CLICKHOUSE_TESTS_SERVER_BIN_PATH"] = self.clickhouse_binary_path
os.environ["CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH"] = os.path.join(
self.environ["COMPOSE_HTTP_TIMEOUT"] = "300"
self.environ["CLICKHOUSE_TESTS_SERVER_BIN_PATH"] = self.clickhouse_binary_path
self.environ["CLICKHOUSE_TESTS_ODBC_BRIDGE_BIN_PATH"] = os.path.join(
os.path.dirname(self.clickhouse_binary_path), "clickhouse-odbc-bridge")
os.environ["CLICKHOUSE_TESTS_DIR"] = self.configs_dir
self.environ["CLICKHOUSE_TESTS_DIR"] = self.configs_dir
with And("I list environment variables to show their values"):
self.command(None, "env | grep CLICKHOUSE")
with Given("docker-compose"):
max_attempts = 5
max_up_attempts = 1
for attempt in range(max_attempts):
with When(f"attempt {attempt}/{max_attempts}"):
with By("pulling images for all the services"):
cmd = self.command(None, f"{self.docker_compose} pull 2>&1 | tee", exitcode=None, timeout=timeout)
if cmd.exitcode != 0:
continue
with And("checking if any containers are already running"):
self.command(None, f"{self.docker_compose} ps | tee")
with And("executing docker-compose down just in case it is up"):
cmd = self.command(None, f"{self.docker_compose} down --remove-orphans 2>&1 | tee", exitcode=None, timeout=timeout)
cmd = self.command(None, f"{self.docker_compose} down 2>&1 | tee", exitcode=None, timeout=timeout)
if cmd.exitcode != 0:
continue
with And("checking if any containers are still left running"):
self.command(None, f"{self.docker_compose} ps | tee")
with And("executing docker-compose up"):
cmd = self.command(None, f"{self.docker_compose} up -d 2>&1 | tee", timeout=timeout)
for up_attempt in range(max_up_attempts):
with By(f"attempt {up_attempt}/{max_up_attempts}"):
cmd = self.command(None, f"{self.docker_compose} up --renew-anon-volumes --force-recreate --timeout 300 -d 2>&1 | tee", timeout=timeout)
if "is unhealthy" not in cmd.output:
break
with Then("check there are no unhealthy containers"):
if "is unhealthy" in cmd.output:
self.command(None, f"{self.docker_compose} ps | tee")
ps_cmd = self.command(None, f"{self.docker_compose} ps | tee | grep -v \"Exit 0\"")
if "is unhealthy" in cmd.output or "Exit" in ps_cmd.output:
self.command(None, f"{self.docker_compose} logs | tee")
continue
if cmd.exitcode == 0:
if cmd.exitcode == 0 and "is unhealthy" not in cmd.output and "Exit" not in ps_cmd.output:
break
if cmd.exitcode != 0:
if cmd.exitcode != 0 or "is unhealthy" in cmd.output or "Exit" in ps_cmd.output:
fail("could not bring up docker-compose cluster")
with Then("wait all nodes report healhy"):

View File

@ -0,0 +1,102 @@
import testflows.settings as settings
from testflows.core import *
from multiprocessing.dummy import Pool
from multiprocessing import TimeoutError as PoolTaskTimeoutError
@TestStep(Given)
def instrument_clickhouse_server_log(self, node=None, test=None,
clickhouse_server_log="/var/log/clickhouse-server/clickhouse-server.log"):
"""Instrument clickhouse-server.log for the current test (default)
by adding start and end messages that include test name to log
of the specified node. If we are in the debug mode and the test
fails then dump the messages from the log for this test.
"""
if test is None:
test = current()
if node is None:
node = self.context.node
with By("getting current log size"):
cmd = node.command(f"stat --format=%s {clickhouse_server_log}")
start_logsize = cmd.output.split(" ")[0].strip()
try:
with And("adding test name start message to the clickhouse-server.log"):
node.command(f"echo -e \"\\n-- start: {test.name} --\\n\" >> {clickhouse_server_log}")
yield
finally:
if top().terminating is True:
return
with Finally("adding test name end message to the clickhouse-server.log", flags=TE):
node.command(f"echo -e \"\\n-- end: {test.name} --\\n\" >> {clickhouse_server_log}")
with And("getting current log size at the end of the test"):
cmd = node.command(f"stat --format=%s {clickhouse_server_log}")
end_logsize = cmd.output.split(" ")[0].strip()
with And("checking if test has failing result"):
if settings.debug and not self.parent.result:
with Then("dumping clickhouse-server.log for this test"):
node.command(f"tail -c +{start_logsize} {clickhouse_server_log}"
f" | head -c {int(end_logsize) - int(start_logsize)}")
def join(tasks, timeout=None, polling=5):
"""Join all parallel tests.
"""
exc = None
for task in tasks:
task._join_timeout = timeout
while tasks:
try:
try:
tasks[0].get(timeout=polling)
tasks.pop(0)
except PoolTaskTimeoutError as e:
task = tasks.pop(0)
if task._join_timeout is not None:
task._join_timeout -= polling
if task._join_timeout <= 0:
raise
tasks.append(task)
continue
except KeyboardInterrupt as e:
top().terminating = True
raise
except Exception as e:
tasks.pop(0)
if exc is None:
exc = e
top().terminating = True
if exc is not None:
raise exc
def start(pool, tasks, scenario, kwargs=None):
"""Start parallel test.
"""
if kwargs is None:
kwargs = {}
task = pool.apply_async(scenario, [], kwargs)
tasks.append(task)
return task
def run_scenario(pool, tasks, scenario, kwargs=None):
if kwargs is None:
kwargs = {}
if current().context.parallel:
start(pool, tasks, scenario, kwargs)
else:
scenario(**kwargs)

6
tests/testflows/kerberos/regression.py Normal file → Executable file
View File

@ -1,3 +1,5 @@
#!/usr/bin/env python3
import os
import sys
from testflows.core import *
@ -21,12 +23,14 @@ xfails = {
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
"""ClickHouse Kerberos authentication test regression module.
"""
top().terminating = False
nodes = {
"clickhouse": ("clickhouse1", "clickhouse2", "clickhouse3"),
"kerberos": ("kerberos", ),
}
with Cluster(local, clickhouse_binary_path, nodes=nodes) as cluster:
with Cluster(local, clickhouse_binary_path, nodes=nodes,
docker_compose_project_dir=os.path.join(current_dir(), "kerberos_env")) as cluster:
self.context.cluster = cluster
Feature(run=load("kerberos.tests.generic", "generic"), flags=TE)

View File

@ -135,7 +135,7 @@ services:
zookeeper:
condition: service_healthy
# dummy service which does nothing, but allows to postpone
# dummy service which does nothing, but allows to postpone
# 'docker-compose up -d' till all dependecies will go healthy
all_services_ready:
image: hello-world

View File

@ -24,11 +24,8 @@ services:
phpldapadmin:
image: osixia/phpldapadmin:0.9.0
container_name: phpldapadmin
environment:
PHPLDAPADMIN_HTTPS=false:
ports:
- "8080:80"
healthcheck:
test: echo 1
interval: 10s
@ -37,3 +34,4 @@ services:
start_period: 300s
security_opt:
- label:disable

View File

@ -2,7 +2,7 @@ version: '2.3'
services:
zookeeper:
image: zookeeper:3.6.2
image: zookeeper:3.4.12
expose:
- "2181"
environment:

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python3
import os
import sys
from testflows.core import *
@ -39,11 +40,13 @@ xfails = {
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
"""ClickHouse integration with LDAP regression module.
"""
top().terminating = False
nodes = {
"clickhouse": ("clickhouse1", "clickhouse2", "clickhouse3"),
}
with Cluster(local, clickhouse_binary_path, nodes=nodes) as cluster:
with Cluster(local, clickhouse_binary_path, nodes=nodes,
docker_compose_project_dir=os.path.join(current_dir(), "ldap_authentication_env")) as cluster:
self.context.cluster = cluster
if stress is not None or not hasattr(self.context, "stress"):

View File

@ -2,7 +2,7 @@
import random
import time
from multiprocessing.dummy import Pool
from helpers.common import Pool, join
from testflows.core import *
from testflows.asserts import error
from ldap.authentication.tests.common import *
@ -64,7 +64,7 @@ def add_user_to_ldap_and_login(self, server, user=None, ch_user=None, login=None
RQ_SRS_007_LDAP_Authentication_Parallel("1.0"),
RQ_SRS_007_LDAP_Authentication_Parallel_ValidAndInvalid("1.0")
)
def parallel_login(self, server, user_count=10, timeout=200, rbac=False):
def parallel_login(self, server, user_count=10, timeout=300, rbac=False):
"""Check that login of valid and invalid LDAP authenticated users works in parallel.
"""
self.context.ldap_node = self.context.cluster.node(server)
@ -103,17 +103,17 @@ def parallel_login(self, server, user_count=10, timeout=200, rbac=False):
steps=False)
with When("I login in parallel"):
p = Pool(15)
tasks = []
for i in range(5):
tasks.append(p.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(p.apply_async(login_with_valid_username_and_invalid_password, (users, i, 50,)))
tasks.append(p.apply_async(login_with_invalid_username_and_valid_password, (users, i, 50,)))
with Then("it should work"):
for task in tasks:
task.get(timeout=timeout)
with Pool(4) as pool:
try:
for i in range(5):
tasks.append(pool.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(pool.apply_async(login_with_valid_username_and_invalid_password, (users, i, 50,)))
tasks.append(pool.apply_async(login_with_invalid_username_and_valid_password, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout=timeout)
@TestScenario
@Requirements(
RQ_SRS_007_LDAP_Authentication_Invalid("1.0"),
@ -530,9 +530,6 @@ def valid_verification_cooldown_value_cn_change(self, server, rbac=False, timeou
after successful authentication when the verification_cooldown parameter
is set and the user cn is changed.
"""
error_message = "DB::Exception: testVCD: Authentication failed: password is incorrect or there is no user with such name"
error_exitcode = 4
user = None
new_user = None
@ -543,7 +540,7 @@ def valid_verification_cooldown_value_cn_change(self, server, rbac=False, timeou
"enable_tls": "no",
"auth_dn_prefix": "cn=",
"auth_dn_suffix": ",ou=users,dc=company,dc=com",
"verification_cooldown": "2"
"verification_cooldown": "600"
}}
self.context.ldap_node = self.context.cluster.node(server)
@ -563,12 +560,6 @@ def valid_verification_cooldown_value_cn_change(self, server, rbac=False, timeou
with Then("when I try to login again with the old user cn it should work"):
login_and_execute_query(username=user["cn"], password=user["userpassword"])
with And("when I sleep for 2 seconds and try to log in, it should fail"):
time.sleep(2)
login_and_execute_query(username=user["cn"], password=user["userpassword"],
exitcode=error_exitcode, message=error_message)
finally:
with Finally("I make sure LDAP user is deleted"):
if new_user is not None:
@ -584,9 +575,6 @@ def valid_verification_cooldown_value_password_change(self, server, rbac=False,
after successful authentication when the verification_cooldown parameter
is set and the user password is changed.
"""
error_message = "DB::Exception: testVCD: Authentication failed: password is incorrect or there is no user with such name"
error_exitcode = 4
user = None
with Given("I have an LDAP configuration that sets verification_cooldown parameter to 2 sec"):
@ -596,7 +584,7 @@ def valid_verification_cooldown_value_password_change(self, server, rbac=False,
"enable_tls": "no",
"auth_dn_prefix": "cn=",
"auth_dn_suffix": ",ou=users,dc=company,dc=com",
"verification_cooldown": "2"
"verification_cooldown": "600"
}}
self.context.ldap_node = self.context.cluster.node(server)
@ -616,12 +604,6 @@ def valid_verification_cooldown_value_password_change(self, server, rbac=False,
with Then("when I try to login again with the old password it should work"):
login_and_execute_query(username=user["cn"], password=user["userpassword"])
with And("when I sleep for 2 seconds and try to log in, it should fail"):
time.sleep(2)
login_and_execute_query(username=user["cn"], password=user["userpassword"],
exitcode=error_exitcode, message=error_message)
finally:
with Finally("I make sure LDAP user is deleted"):
if user is not None:
@ -637,9 +619,6 @@ def valid_verification_cooldown_value_ldap_unavailable(self, server, rbac=False,
after successful authentication when the verification_cooldown parameter
is set, even when the LDAP server is offline.
"""
error_message = "DB::Exception: testVCD: Authentication failed: password is incorrect or there is no user with such name"
error_exitcode = 4
user = None
with Given("I have an LDAP configuration that sets verification_cooldown parameter to 2 sec"):
@ -649,7 +628,7 @@ def valid_verification_cooldown_value_ldap_unavailable(self, server, rbac=False,
"enable_tls": "no",
"auth_dn_prefix": "cn=",
"auth_dn_suffix": ",ou=users,dc=company,dc=com",
"verification_cooldown": "2"
"verification_cooldown": "600"
}}
self.context.ldap_node = self.context.cluster.node(server)
@ -672,12 +651,6 @@ def valid_verification_cooldown_value_ldap_unavailable(self, server, rbac=False,
with Then("when I try to login again with the server offline it should work"):
login_and_execute_query(username=user["cn"], password=user["userpassword"])
with And("when I sleep for 2 seconds and try to log in, it should fail"):
time.sleep(2)
login_and_execute_query(username=user["cn"], password=user["userpassword"],
exitcode=error_exitcode, message=error_message)
finally:
with Finally("I start the ldap server back up"):
self.context.ldap_node.start()

View File

@ -47,7 +47,7 @@ ASCII_CHARS = string.ascii_lowercase + string.ascii_uppercase + string.digits
def randomword(length, chars=ASCII_CHARS):
return ''.join(random.choice(chars) for i in range(length))
def restart(node=None, safe=False, timeout=60):
def restart(node=None, safe=False, timeout=300):
"""Restart ClickHouse server and wait for config to be reloaded.
"""
with When("I restart ClickHouse server node"):
@ -78,7 +78,7 @@ def restart(node=None, safe=False, timeout=60):
f"ConfigReloader: Loaded config '/etc/clickhouse-server/config.xml', performed update on configuration",
timeout=timeout)
def add_config(config, timeout=60, restart=False, modify=False):
def add_config(config, timeout=300, restart=False, modify=False):
"""Add dynamic configuration file to ClickHouse.
:param node: node
@ -86,6 +86,7 @@ def add_config(config, timeout=60, restart=False, modify=False):
:param timeout: timeout, default: 20 sec
"""
node = current().context.node
cluster = current().context.cluster
def check_preprocessed_config_is_updated(after_removal=False):
"""Check that preprocessed config is updated.
@ -123,7 +124,7 @@ def add_config(config, timeout=60, restart=False, modify=False):
with And("I get the current log size"):
cmd = node.cluster.command(None,
f"stat --format=%s {os.environ['CLICKHOUSE_TESTS_DIR']}/_instances/{node.name}/logs/clickhouse-server.log")
f"stat --format=%s {cluster.environ['CLICKHOUSE_TESTS_DIR']}/_instances/{node.name}/logs/clickhouse-server.log")
logsize = cmd.output.split(" ")[0].strip()
with And("I start ClickHouse back up"):
@ -210,7 +211,7 @@ def modify_config(config, restart=False):
@contextmanager
def ldap_servers(servers, config_d_dir="/etc/clickhouse-server/config.d", config_file="ldap_servers.xml",
timeout=60, restart=False, config=None):
timeout=300, restart=False, config=None):
"""Add LDAP servers configuration.
"""
if config is None:
@ -248,7 +249,7 @@ def add_users_identified_with_ldap(*users):
try:
with Given("I create users"):
for user in users:
node.query(f"CREATE USER '{user['username']}' IDENTIFIED WITH ldap SERVER '{user['server']}'")
node.query(f"CREATE USER '{user['username']}' IDENTIFIED WITH LDAP SERVER '{user['server']}'")
yield
finally:
with Finally("I remove users"):
@ -258,7 +259,7 @@ def add_users_identified_with_ldap(*users):
@contextmanager
def ldap_authenticated_users(*users, config_d_dir="/etc/clickhouse-server/users.d",
config_file=None, timeout=60, restart=True, config=None, rbac=False):
config_file=None, timeout=300, restart=True, config=None, rbac=False):
"""Add LDAP authenticated users.
"""
if rbac:
@ -268,9 +269,9 @@ def ldap_authenticated_users(*users, config_d_dir="/etc/clickhouse-server/users.
config_file = f"ldap_users_{getuid()}.xml"
if config is None:
config = create_ldap_users_config_content(*users, config_d_dir=config_d_dir, config_file=config_file)
return add_config(config, restart=restart)
return add_config(config, timeout=timeout, restart=restart)
def invalid_server_config(servers, message=None, tail=30, timeout=60):
def invalid_server_config(servers, message=None, tail=30, timeout=300):
"""Check that ClickHouse errors when trying to load invalid LDAP servers configuration file.
"""
node = current().context.node
@ -299,7 +300,7 @@ def invalid_server_config(servers, message=None, tail=30, timeout=60):
with By("removing the config file", description=config.path):
node.command(f"rm -rf {config.path}", exitcode=0)
def invalid_user_config(servers, config, message=None, tail=30, timeout=60):
def invalid_user_config(servers, config, message=None, tail=30, timeout=300):
"""Check that ClickHouse errors when trying to load invalid LDAP users configuration file.
"""
node = current().context.node

View File

@ -135,7 +135,7 @@ services:
zookeeper:
condition: service_healthy
# dummy service which does nothing, but allows to postpone
# dummy service which does nothing, but allows to postpone
# 'docker-compose up -d' till all dependecies will go healthy
all_services_ready:
image: hello-world

View File

@ -24,11 +24,8 @@ services:
phpldapadmin:
image: osixia/phpldapadmin:0.9.0
container_name: phpldapadmin
environment:
PHPLDAPADMIN_HTTPS=false:
ports:
- "8080:80"
healthcheck:
test: echo 1
interval: 10s
@ -37,3 +34,4 @@ services:
start_period: 300s
security_opt:
- label:disable

View File

@ -2,7 +2,7 @@ version: '2.3'
services:
zookeeper:
image: zookeeper:3.6.2
image: zookeeper:3.4.12
expose:
- "2181"
environment:

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python3
import os
import sys
from testflows.core import *
@ -39,11 +40,13 @@ xfails = {
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
"""ClickHouse LDAP external user directory regression module.
"""
top().terminating = False
nodes = {
"clickhouse": ("clickhouse1", "clickhouse2", "clickhouse3"),
}
with Cluster(local, clickhouse_binary_path, nodes=nodes) as cluster:
with Cluster(local, clickhouse_binary_path, nodes=nodes,
docker_compose_project_dir=os.path.join(current_dir(), "ldap_external_user_directory_env")) as cluster:
self.context.cluster = cluster
if stress is not None or not hasattr(self.context, "stress"):

View File

@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
import random
from multiprocessing.dummy import Pool
from helpers.common import Pool, join
from testflows.core import *
from testflows.asserts import error
@ -83,7 +83,7 @@ def login_with_invalid_username_and_valid_password(users, i, iterations=10):
RQ_SRS_009_LDAP_ExternalUserDirectory_Authentication_Parallel("1.0"),
RQ_SRS_009_LDAP_ExternalUserDirectory_Authentication_Parallel_ValidAndInvalid("1.0")
)
def parallel_login(self, server, user_count=10, timeout=200):
def parallel_login(self, server, user_count=10, timeout=300):
"""Check that login of valid and invalid LDAP authenticated users works in parallel.
"""
self.context.ldap_node = self.context.cluster.node(server)
@ -94,28 +94,28 @@ def parallel_login(self, server, user_count=10, timeout=200):
with ldap_users(*users):
tasks = []
try:
with When("users try to login in parallel", description="""
* with valid username and password
* with invalid username and valid password
* with valid username and invalid password
"""):
p = Pool(15)
for i in range(25):
tasks.append(p.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(p.apply_async(login_with_valid_username_and_invalid_password, (users, i, 50,)))
tasks.append(p.apply_async(login_with_invalid_username_and_valid_password, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
with Pool(4) as pool:
try:
with When("users try to login in parallel", description="""
* with valid username and password
* with invalid username and valid password
* with valid username and invalid password
"""):
for i in range(10):
tasks.append(pool.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(pool.apply_async(login_with_valid_username_and_invalid_password, (users, i, 50,)))
tasks.append(pool.apply_async(login_with_invalid_username_and_valid_password, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
@TestScenario
@Requirements(
RQ_SRS_009_LDAP_ExternalUserDirectory_Authentication_Parallel_SameUser("1.0"),
RQ_SRS_009_LDAP_ExternalUserDirectory_Authentication_Parallel_ValidAndInvalid("1.0")
)
def parallel_login_with_the_same_user(self, server, timeout=200):
def parallel_login_with_the_same_user(self, server, timeout=300):
"""Check that valid and invalid logins of the same
LDAP authenticated user works in parallel.
"""
@ -127,21 +127,20 @@ def parallel_login_with_the_same_user(self, server, timeout=200):
with ldap_users(*users):
tasks = []
try:
with When("the same user tries to login in parallel", description="""
* with valid username and password
* with invalid username and valid password
* with valid username and invalid password
"""):
p = Pool(15)
for i in range(25):
tasks.append(p.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(p.apply_async(login_with_valid_username_and_invalid_password, (users, i, 50,)))
tasks.append(p.apply_async(login_with_invalid_username_and_valid_password, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
with Pool(4) as pool:
try:
with When("the same user tries to login in parallel", description="""
* with valid username and password
* with invalid username and valid password
* with valid username and invalid password
"""):
for i in range(10):
tasks.append(pool.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(pool.apply_async(login_with_valid_username_and_invalid_password, (users, i, 50,)))
tasks.append(pool.apply_async(login_with_invalid_username_and_valid_password, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
@TestScenario
@Tags("custom config")
@ -164,7 +163,7 @@ def login_after_ldap_external_user_directory_is_removed(self, server):
RQ_SRS_009_LDAP_ExternalUserDirectory_Authentication_Parallel_SameUser("1.0"),
RQ_SRS_009_LDAP_ExternalUserDirectory_Authentication_Parallel_ValidAndInvalid("1.0")
)
def parallel_login_with_the_same_user_multiple_servers(self, server, timeout=200):
def parallel_login_with_the_same_user_multiple_servers(self, server, timeout=300):
"""Check that valid and invalid logins of the same
user defined in multiple LDAP external user directories
works in parallel.
@ -185,21 +184,20 @@ def parallel_login_with_the_same_user_multiple_servers(self, server, timeout=200
with ldap_users(*users, node=self.context.cluster.node("openldap1")):
with ldap_users(*users, node=self.context.cluster.node("openldap2")):
tasks = []
try:
with When("the same user tries to login in parallel", description="""
* with valid username and password
* with invalid username and valid password
* with valid username and invalid password
"""):
p = Pool(15)
for i in range(25):
tasks.append(p.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(p.apply_async(login_with_valid_username_and_invalid_password, (users, i, 50,)))
tasks.append(p.apply_async(login_with_invalid_username_and_valid_password, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
with Pool(4) as pool:
try:
with When("the same user tries to login in parallel", description="""
* with valid username and password
* with invalid username and valid password
* with valid username and invalid password
"""):
for i in range(10):
tasks.append(pool.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(pool.apply_async(login_with_valid_username_and_invalid_password, (users, i, 50,)))
tasks.append(pool.apply_async(login_with_invalid_username_and_valid_password, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
@TestScenario
@Tags("custom config")
@ -207,7 +205,7 @@ def parallel_login_with_the_same_user_multiple_servers(self, server, timeout=200
RQ_SRS_009_LDAP_ExternalUserDirectory_Authentication_Parallel_MultipleServers("1.0"),
RQ_SRS_009_LDAP_ExternalUserDirectory_Authentication_Parallel_ValidAndInvalid("1.0")
)
def parallel_login_with_multiple_servers(self, server, user_count=10, timeout=200):
def parallel_login_with_multiple_servers(self, server, user_count=10, timeout=300):
"""Check that login of valid and invalid LDAP authenticated users works in parallel
using multiple LDAP external user directories.
"""
@ -237,22 +235,20 @@ def parallel_login_with_multiple_servers(self, server, user_count=10, timeout=20
with ldap_users(*user_groups["openldap1_users"], node=self.context.cluster.node("openldap1")):
with ldap_users(*user_groups["openldap2_users"], node=self.context.cluster.node("openldap2")):
tasks = []
try:
with When("users in each group try to login in parallel", description="""
* with valid username and password
* with invalid username and valid password
* with valid username and invalid password
"""):
p = Pool(15)
for i in range(25):
for users in user_groups.values():
for check in checks:
tasks.append(p.apply_async(check, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
with Pool(4) as pool:
try:
with When("users in each group try to login in parallel", description="""
* with valid username and password
* with invalid username and valid password
* with valid username and invalid password
"""):
for i in range(10):
for users in user_groups.values():
for check in checks:
tasks.append(pool.apply_async(check, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
@TestScenario
@Tags("custom config")
@ -260,7 +256,7 @@ def parallel_login_with_multiple_servers(self, server, user_count=10, timeout=20
RQ_SRS_009_LDAP_ExternalUserDirectory_Authentication_Parallel_LocalAndMultipleLDAP("1.0"),
RQ_SRS_009_LDAP_ExternalUserDirectory_Authentication_Parallel_ValidAndInvalid("1.0")
)
def parallel_login_with_rbac_and_multiple_servers(self, server, user_count=10, timeout=200):
def parallel_login_with_rbac_and_multiple_servers(self, server, user_count=10, timeout=300):
"""Check that login of valid and invalid users works in parallel
using local users defined using RBAC and LDAP users authenticated using
multiple LDAP external user directories.
@ -293,28 +289,26 @@ def parallel_login_with_rbac_and_multiple_servers(self, server, user_count=10, t
with ldap_users(*user_groups["openldap2_users"], node=self.context.cluster.node("openldap2")):
with rbac_users(*user_groups["local_users"]):
tasks = []
try:
with When("users in each group try to login in parallel", description="""
* with valid username and password
* with invalid username and valid password
* with valid username and invalid password
"""):
p = Pool(15)
for i in range(25):
for users in user_groups.values():
for check in checks:
tasks.append(p.apply_async(check, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
with Pool(4) as pool:
try:
with When("users in each group try to login in parallel", description="""
* with valid username and password
* with invalid username and valid password
* with valid username and invalid password
"""):
for i in range(10):
for users in user_groups.values():
for check in checks:
tasks.append(pool.apply_async(check, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
@TestScenario
@Requirements(
RQ_SRS_009_LDAP_ExternalUserDirectory_Authentication_Parallel_LocalOnly("1.0")
)
def parallel_login_with_rbac_users(self, server, user_count=10, timeout=200):
def parallel_login_with_rbac_users(self, server, user_count=10, timeout=300):
"""Check that login of only valid and invalid local users created using RBAC
works in parallel when server configuration includes LDAP external user directory.
"""
@ -325,16 +319,16 @@ def parallel_login_with_rbac_users(self, server, user_count=10, timeout=200):
with rbac_users(*users):
tasks = []
try:
with When("I login in parallel"):
p = Pool(15)
for i in range(25):
tasks.append(p.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(p.apply_async(login_with_valid_username_and_invalid_password, (users, i, 50,)))
tasks.append(p.apply_async(login_with_invalid_username_and_valid_password, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
with Pool(4) as pool:
try:
with When("I login in parallel"):
for i in range(10):
tasks.append(pool.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(pool.apply_async(login_with_valid_username_and_invalid_password, (users, i, 50,)))
tasks.append(pool.apply_async(login_with_invalid_username_and_valid_password, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
@TestScenario
@Requirements(
@ -703,7 +697,7 @@ def empty_username_and_empty_password(self, server=None):
@Requirements(
RQ_SRS_009_LDAP_ExternalUserDirectory_Configuration_Server_VerificationCooldown_Default("1.0")
)
def default_verification_cooldown_value(self, server, rbac=False, timeout=20):
def default_verification_cooldown_value(self, server, rbac=False):
"""Check that the default value (0) for the verification cooldown parameter
disables caching and forces contacting the LDAP server for each
authentication request.
@ -748,7 +742,7 @@ def default_verification_cooldown_value(self, server, rbac=False, timeout=20):
@Requirements(
RQ_SRS_009_LDAP_ExternalUserDirectory_Configuration_Server_VerificationCooldown("1.0")
)
def valid_verification_cooldown_value_cn_change(self, server, rbac=False, timeout=20):
def valid_verification_cooldown_value_cn_change(self, server, rbac=False):
"""Check that we can perform requests without contacting the LDAP server
after successful authentication when the verification_cooldown parameter
is set and the user cn is changed.
@ -803,7 +797,7 @@ def valid_verification_cooldown_value_cn_change(self, server, rbac=False, timeou
@Requirements(
RQ_SRS_009_LDAP_ExternalUserDirectory_Configuration_Server_VerificationCooldown("1.0")
)
def valid_verification_cooldown_value_password_change(self, server, rbac=False, timeout=20):
def valid_verification_cooldown_value_password_change(self, server, rbac=False):
"""Check that we can perform requests without contacting the LDAP server
after successful authentication when the verification_cooldown parameter
is set and the user password is changed.
@ -857,7 +851,7 @@ def valid_verification_cooldown_value_password_change(self, server, rbac=False,
@Requirements(
RQ_SRS_009_LDAP_ExternalUserDirectory_Configuration_Server_VerificationCooldown("1.0")
)
def valid_verification_cooldown_value_ldap_unavailable(self, server, rbac=False, timeout=20):
def valid_verification_cooldown_value_ldap_unavailable(self, server, rbac=False):
"""Check that we can perform requests without contacting the LDAP server
after successful authentication when the verification_cooldown parameter
is set, even when the LDAP server is offline.

View File

@ -12,21 +12,6 @@ from ldap.authentication.tests.common import change_user_password_in_ldap, chang
from ldap.authentication.tests.common import create_ldap_servers_config_content
from ldap.authentication.tests.common import randomword
def join(tasks, timeout):
"""Join async tasks by waiting for their completion.
"""
task_exc = None
for task in tasks:
try:
task.get(timeout=timeout)
except Exception as exc:
if task_exc is None:
task_exc = exc
if task_exc is not None:
raise task_exc
@contextmanager
def table(name, create_statement, on_cluster=False):
node = current().context.node

View File

@ -1,6 +1,6 @@
import random
from multiprocessing.dummy import Pool
from helpers.common import Pool, join
from testflows.core import *
from testflows.asserts import error
@ -134,7 +134,7 @@ def dynamically_added_users(self, node="clickhouse1", count=10):
@Requirements(
RQ_SRS_009_LDAP_ExternalUserDirectory_Restart_Server_ParallelLogins("1.0")
)
def parallel_login(self, server=None, user_count=10, timeout=200):
def parallel_login(self, server=None, user_count=10, timeout=300):
"""Check that login of valid and invalid users works in parallel
using local users defined using RBAC and LDAP users authenticated using
multiple LDAP external user directories when server is restarted
@ -262,28 +262,28 @@ def parallel_login(self, server=None, user_count=10, timeout=200):
with ldap_users(*user_groups["openldap2_users"], node=self.context.cluster.node("openldap2")):
with rbac_users(*user_groups["local_users"]):
tasks = []
try:
with When("I restart the server during parallel login of users in each group"):
p = Pool(10)
for users in user_groups.values():
for check in checks:
tasks.append(p.apply_async(check, (users, 0, 25, True)))
tasks.append(p.apply_async(restart))
finally:
with Then("logins during restart should work"):
join(tasks, timeout)
with Pool(4) as pool:
try:
with When("I restart the server during parallel login of users in each group"):
for users in user_groups.values():
for check in checks:
tasks.append(pool.apply_async(check, (users, 0, 25, True)))
tasks.append(pool.apply_async(restart))
finally:
with Then("logins during restart should work"):
join(tasks, timeout)
tasks = []
try:
with When("I perform parallel login of users in each group after restart"):
p = Pool(10)
for users in user_groups.values():
for check in checks:
tasks.append(p.apply_async(check, (users, 0, 10, False)))
finally:
with Then("logins after restart should work"):
join(tasks, timeout)
with Pool(4) as pool:
try:
with When("I perform parallel login of users in each group after restart"):
for users in user_groups.values():
for check in checks:
tasks.append(pool.apply_async(check, (users, 0, 10, False)))
finally:
with Then("logins after restart should work"):
join(tasks, timeout)
@TestOutline(Feature)
@Name("restart")
@ -297,4 +297,4 @@ def feature(self, servers=None, server=None, node="clickhouse1"):
self.context.node = self.context.cluster.node(node)
for scenario in loads(current_module(), Scenario):
Scenario(test=scenario, flags=TE)()
Scenario(test=scenario)()

View File

@ -41,7 +41,7 @@ def invalid_host(self):
RQ_SRS_009_LDAP_ExternalUserDirectory_Configuration_Server_Invalid("1.0"),
RQ_SRS_009_LDAP_ExternalUserDirectory_Configuration_Server_Host("1.0")
)
def empty_host(self, tail=30, timeout=60):
def empty_host(self, tail=30, timeout=300):
"""Check that server returns an error when LDAP server
host value is empty.
"""
@ -57,7 +57,7 @@ def empty_host(self, tail=30, timeout=60):
RQ_SRS_009_LDAP_ExternalUserDirectory_Configuration_Server_Invalid("1.0"),
RQ_SRS_009_LDAP_ExternalUserDirectory_Configuration_Server_Host("1.0")
)
def missing_host(self, tail=30, timeout=60):
def missing_host(self, tail=30, timeout=300):
"""Check that server returns an error when LDAP server
host is missing.
"""

View File

@ -4,6 +4,7 @@ from testflows.core import *
append_path(sys.path, "..")
from helpers.common import Pool, join, run_scenario
from helpers.argparser import argparser
@TestModule
@ -12,11 +13,22 @@ from helpers.argparser import argparser
def regression(self, local, clickhouse_binary_path, parallel=None, stress=None):
"""ClickHouse LDAP integration regression module.
"""
top().terminating = False
args = {"local": local, "clickhouse_binary_path": clickhouse_binary_path}
Feature(test=load("ldap.authentication.regression", "regression"))(**args)
Feature(test=load("ldap.external_user_directory.regression", "regression"))(**args)
Feature(test=load("ldap.role_mapping.regression", "regression"))(**args)
if stress is not None:
self.context.stress = stress
if parallel is not None:
self.context.parallel = parallel
tasks = []
with Pool(3) as pool:
try:
run_scenario(pool, tasks, Feature(test=load("ldap.authentication.regression", "regression")), args)
run_scenario(pool, tasks, Feature(test=load("ldap.external_user_directory.regression", "regression")), args)
run_scenario(pool, tasks, Feature(test=load("ldap.role_mapping.regression", "regression")), args)
finally:
join(tasks)
if main():
regression()

View File

@ -135,7 +135,7 @@ services:
zookeeper:
condition: service_healthy
# dummy service which does nothing, but allows to postpone
# dummy service which does nothing, but allows to postpone
# 'docker-compose up -d' till all dependecies will go healthy
all_services_ready:
image: hello-world

View File

@ -24,11 +24,8 @@ services:
phpldapadmin:
image: osixia/phpldapadmin:0.9.0
container_name: phpldapadmin
environment:
PHPLDAPADMIN_HTTPS=false:
ports:
- "8080:80"
healthcheck:
test: echo 1
interval: 10s
@ -37,3 +34,4 @@ services:
start_period: 300s
security_opt:
- label:disable

View File

@ -2,7 +2,7 @@ version: '2.3'
services:
zookeeper:
image: zookeeper:3.6.2
image: zookeeper:3.4.12
expose:
- "2181"
environment:

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python3
import os
import sys
from testflows.core import *
@ -11,6 +12,8 @@ from ldap.role_mapping.requirements import *
# Cross-outs of known fails
xfails = {
"mapping/roles removed and added in parallel":
[(Fail, "known bug")],
"user dn detection/mapping/roles removed and added in parallel":
[(Fail, "known bug")]
}
@ -18,7 +21,7 @@ xfails = {
@Name("role mapping")
@ArgumentParser(argparser)
@Specifications(
SRS_014_ClickHouse_LDAP_Role_Mapping
QA_SRS014_ClickHouse_LDAP_Role_Mapping
)
@Requirements(
RQ_SRS_014_LDAP_RoleMapping("1.0")
@ -27,11 +30,13 @@ xfails = {
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
"""ClickHouse LDAP role mapping regression module.
"""
top().terminating = False
nodes = {
"clickhouse": ("clickhouse1", "clickhouse2", "clickhouse3"),
}
with Cluster(local, clickhouse_binary_path, nodes=nodes) as cluster:
with Cluster(local, clickhouse_binary_path, nodes=nodes,
docker_compose_project_dir=os.path.join(current_dir(), "ldap_role_mapping_env")) as cluster:
self.context.cluster = cluster
if stress is not None or not hasattr(self.context, "stress"):
@ -42,6 +47,7 @@ def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
Scenario(run=load("ldap.authentication.tests.sanity", "scenario"), name="ldap sanity")
Feature(run=load("ldap.role_mapping.tests.server_config", "feature"))
Feature(run=load("ldap.role_mapping.tests.mapping", "feature"))
#Feature(run=load("ldap.role_mapping.tests.user_dn_detection", "feature"))
if main():
regression()

View File

@ -24,11 +24,12 @@ def create_table(self, name, create_statement, on_cluster=False):
node.query(f"DROP TABLE IF EXISTS {name}")
@TestStep(Given)
def add_ldap_servers_configuration(self, servers, config_d_dir="/etc/clickhouse-server/config.d",
def add_ldap_servers_configuration(self, servers, config=None, config_d_dir="/etc/clickhouse-server/config.d",
config_file="ldap_servers.xml", timeout=60, restart=False):
"""Add LDAP servers configuration to config.xml.
"""
config = create_ldap_servers_config_content(servers, config_d_dir, config_file)
if config is None:
config = create_ldap_servers_config_content(servers, config_d_dir, config_file)
return add_config(config, restart=restart)
@TestStep(Given)
@ -249,4 +250,4 @@ def create_ldap_external_user_directory_config_content(server=None, roles=None,
def create_entries_ldap_external_user_directory_config_content(entries, **kwargs):
"""Create LDAP external user directory configuration file content.
"""
return create_xml_config_content(entries, **kwargs)
return create_xml_config_content(entries, **kwargs)

View File

@ -2,11 +2,11 @@
from testflows.core import *
from testflows.asserts import error
from multiprocessing.dummy import Pool
from helpers.common import Pool, join
from ldap.role_mapping.requirements import *
from ldap.role_mapping.tests.common import *
from ldap.external_user_directory.tests.common import join, randomword
from ldap.external_user_directory.tests.common import randomword
from ldap.external_user_directory.tests.authentications import login_with_valid_username_and_password
from ldap.external_user_directory.tests.authentications import login_with_invalid_username_and_valid_password
@ -973,17 +973,16 @@ def group_removed_and_added_in_parallel(self, ldap_server, ldap_user, count=20,
role_mappings=role_mappings, restart=True)
tasks = []
try:
with When("user try to login while LDAP groups are added and removed in parallel"):
p = Pool(15)
for i in range(15):
tasks.append(p.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(p.apply_async(remove_ldap_groups_in_parallel, (groups, i, 10,)))
tasks.append(p.apply_async(add_ldap_groups_in_parallel,(ldap_user, role_names, i, 10,)))
finally:
with Finally("it should work", flags=TE):
join(tasks, timeout)
with Pool(4) as pool:
try:
with When("user try to login while LDAP groups are added and removed in parallel"):
for i in range(10):
tasks.append(pool.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(pool.apply_async(remove_ldap_groups_in_parallel, (groups, i, 10,)))
tasks.append(pool.apply_async(add_ldap_groups_in_parallel,(ldap_user, role_names, i, 10,)))
finally:
with Finally("it should work", flags=TE):
join(tasks, timeout)
finally:
with Finally("I clean up all LDAP groups"):
for group in groups:
@ -1026,17 +1025,16 @@ def user_removed_and_added_in_ldap_groups_in_parallel(self, ldap_server, ldap_us
role_mappings=role_mappings, restart=True)
tasks = []
try:
with When("user try to login while user is added and removed from LDAP groups in parallel"):
p = Pool(15)
for i in range(15):
tasks.append(p.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(p.apply_async(remove_user_from_ldap_groups_in_parallel, (ldap_user, groups, i, 1,)))
tasks.append(p.apply_async(add_user_to_ldap_groups_in_parallel, (ldap_user, groups, i, 1,)))
finally:
with Finally("it should work", flags=TE):
join(tasks, timeout)
with Pool(4) as pool:
try:
with When("user try to login while user is added and removed from LDAP groups in parallel"):
for i in range(10):
tasks.append(pool.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(pool.apply_async(remove_user_from_ldap_groups_in_parallel, (ldap_user, groups, i, 1,)))
tasks.append(pool.apply_async(add_user_to_ldap_groups_in_parallel, (ldap_user, groups, i, 1,)))
finally:
with Finally("it should work", flags=TE):
join(tasks, timeout)
@TestScenario
@Requirements(
@ -1076,22 +1074,21 @@ def roles_removed_and_added_in_parallel(self, ldap_server, ldap_user, count=20,
role_mappings=role_mappings, restart=True)
tasks = []
try:
with When("user try to login while mapped roles are added and removed in parallel"):
p = Pool(15)
for i in range(15):
tasks.append(p.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(p.apply_async(remove_roles_in_parallel, (role_names, i, 10,)))
tasks.append(p.apply_async(add_roles_in_parallel, (role_names, i, 10,)))
with Pool(4) as pool:
try:
with When("user try to login while mapped roles are added and removed in parallel"):
for i in range(10):
tasks.append(pool.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(pool.apply_async(remove_roles_in_parallel, (role_names, i, 10,)))
tasks.append(pool.apply_async(add_roles_in_parallel, (role_names, i, 10,)))
finally:
with Finally("it should work", flags=TE):
join(tasks, timeout)
finally:
with Finally("it should work", flags=TE):
join(tasks, timeout)
with And("I clean up all the roles"):
for role_name in role_names:
with By(f"dropping role {role_name}", flags=TE):
self.context.node.query(f"DROP ROLE IF EXISTS {role_name}")
with And("I clean up all the roles"):
for role_name in role_names:
with By(f"dropping role {role_name}", flags=TE):
self.context.node.query(f"DROP ROLE IF EXISTS {role_name}")
@TestOutline
def parallel_login(self, ldap_server, ldap_user, user_count=10, timeout=200, role_count=10):
@ -1132,21 +1129,20 @@ def parallel_login(self, ldap_server, ldap_user, user_count=10, timeout=200, rol
role_mappings=role_mappings, restart=True)
tasks = []
try:
with When("users try to login in parallel", description="""
* with valid username and password
* with invalid username and valid password
* with valid username and invalid password
"""):
p = Pool(15)
for i in range(25):
tasks.append(p.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(p.apply_async(login_with_valid_username_and_invalid_password, (users, i, 50,)))
tasks.append(p.apply_async(login_with_invalid_username_and_valid_password, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
with Pool(4) as pool:
try:
with When("users try to login in parallel", description="""
* with valid username and password
* with invalid username and valid password
* with valid username and invalid password
"""):
for i in range(10):
tasks.append(pool.apply_async(login_with_valid_username_and_password, (users, i, 50,)))
tasks.append(pool.apply_async(login_with_valid_username_and_invalid_password, (users, i, 50,)))
tasks.append(pool.apply_async(login_with_invalid_username_and_valid_password, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
@TestScenario
@Requirements(
@ -1313,22 +1309,20 @@ def parallel_login_with_multiple_servers(self, ldap_server, ldap_user, user_coun
self.context.node.query(f"GRANT {role_name} TO {user['cn']}")
tasks = []
try:
with When("users in each group try to login in parallel", description="""
* with valid username and password
* with invalid username and valid password
* with valid username and invalid password
"""):
p = Pool(15)
for i in range(25):
for users in user_groups.values():
for check in checks:
tasks.append(p.apply_async(check, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
with Pool(4) as pool:
try:
with When("users in each group try to login in parallel", description="""
* with valid username and password
* with invalid username and valid password
* with valid username and invalid password
"""):
for i in range(10):
for users in user_groups.values():
for check in checks:
tasks.append(pool.apply_async(check, (users, i, 50,)))
finally:
with Then("it should work"):
join(tasks, timeout)
@TestFeature
@Name("mapping")

View File

@ -0,0 +1,475 @@
# -*- coding: utf-8 -*-
import importlib
from testflows.core import *
from testflows.asserts import error
from ldap.role_mapping.requirements import *
from ldap.role_mapping.tests.common import *
@TestOutline
def check_config(self, entries, valid=True, ldap_server="openldap1", user="user1", password="user1"):
"""Apply LDAP server configuration and check login.
"""
if valid:
exitcode = 0
message = "1"
else:
exitcode = 4
message = "DB::Exception: user1: Authentication failed: password is incorrect or there is no user with such name"
with Given("I add LDAP server configuration"):
config = create_xml_config_content(entries=entries, config_file="ldap_servers.xml")
add_ldap_servers_configuration(servers=None, config=config)
with And("I add LDAP external user directory configuration"):
add_ldap_external_user_directory(server=ldap_server,
role_mappings=None, restart=True)
with When(f"I login I try to login as an LDAP user"):
r = self.context.node.query(f"SELECT 1", settings=[
("user", user), ("password", password)], exitcode=exitcode, message=message)
@TestScenario
@Tags("config")
@Requirements(
# FIXME
)
def config_invalid_base_dn(self):
"""Check when invalid `base_dn` is specified in the user_dn_detection section.
"""
with Given("I define LDAP server configuration with invalid base_dn"):
entries = {
"ldap_servers": [
{
"openldap1": {
"host": "openldap1",
"port": "389",
"enable_tls": "no",
"bind_dn": "cn={user_name},ou=users,dc=company,dc=com",
"user_dn_detection": {
"base_dn": "ou=user,dc=company,dc=com",
"search_filter": "(&(objectClass=inetOrgPerson)(uid={user_name}))"
}
}
}
]
}
check_config(entries=entries, valid=False)
@TestScenario
@Tags("config")
@Requirements(
# FIXME
)
def config_empty_base_dn(self):
"""Check when empty `base_dn` is specified in the user_dn_detection section.
"""
with Given("I define LDAP server configuration with invalid base_dn"):
entries = {
"ldap_servers": [
{
"openldap1": {
"host": "openldap1",
"port": "389",
"enable_tls": "no",
"bind_dn": "cn={user_name},ou=users,dc=company,dc=com",
"user_dn_detection": {
"base_dn": "",
"search_filter": "(&(objectClass=inetOrgPerson)(uid={user_name}))"
}
}
}
]
}
check_config(entries=entries, valid=False)
@TestScenario
@Tags("config")
@Requirements(
# FIXME
)
def config_missing_base_dn(self):
"""Check when missing `base_dn` is specified in the user_dn_detection section.
"""
with Given("I define LDAP server configuration with invalid base_dn"):
entries = {
"ldap_servers": [
{
"openldap1": {
"host": "openldap1",
"port": "389",
"enable_tls": "no",
"bind_dn": "cn={user_name},ou=users,dc=company,dc=com",
"user_dn_detection": {
"search_filter": "(&(objectClass=inetOrgPerson)(uid={user_name}))"
}
}
}
]
}
check_config(entries=entries, valid=False)
@TestScenario
@Tags("config")
@Requirements(
# FIXME
)
def config_invalid_search_filter(self):
"""Check when invalid `search_filter` is specified in the user_dn_detection section.
"""
with Given("I define LDAP server configuration with invalid search_filter"):
entries = {
"ldap_servers": [
{
"openldap1": {
"host": "openldap1",
"port": "389",
"enable_tls": "no",
"bind_dn": "cn={user_name},ou=users,dc=company,dc=com",
"user_dn_detection": {
"base_dn": "ou=users,dc=company,dc=com",
"search_filter": "(&(objectClass=inetOrgPersons)(uid={user_name}))"
}
}
}
]
}
check_config(entries=entries, valid=False)
@TestScenario
@Tags("config")
@Requirements(
# FIXME
)
def config_missing_search_filter(self):
"""Check when missing `search_filter` is specified in the user_dn_detection section.
"""
with Given("I define LDAP server configuration with invalid search_filter"):
entries = {
"ldap_servers": [
{
"openldap1": {
"host": "openldap1",
"port": "389",
"enable_tls": "no",
"bind_dn": "cn={user_name},ou=users,dc=company,dc=com",
"user_dn_detection": {
"base_dn": "ou=users,dc=company,dc=com",
}
}
}
]
}
check_config(entries=entries, valid=False)
@TestScenario
@Tags("config")
@Requirements(
# FIXME
)
def config_empty_search_filter(self):
"""Check when empty `search_filter` is specified in the user_dn_detection section.
"""
with Given("I define LDAP server configuration with invalid search_filter"):
entries = {
"ldap_servers": [
{
"openldap1": {
"host": "openldap1",
"port": "389",
"enable_tls": "no",
"bind_dn": "cn={user_name},ou=users,dc=company,dc=com",
"user_dn_detection": {
"base_dn": "ou=users,dc=company,dc=com",
"search_filter": ""
}
}
}
]
}
check_config(entries=entries, valid=False)
@TestScenario
@Tags("config")
@Requirements(
# FIXME
)
def config_valid(self):
"""Check valid config with valid user_dn_detection section.
"""
with Given("I define LDAP server configuration"):
entries = {
"ldap_servers": [
{
"openldap1": {
"host": "openldap1",
"port": "389",
"enable_tls": "no",
"bind_dn": "cn={user_name},ou=users,dc=company,dc=com",
"user_dn_detection": {
"base_dn": "ou=users,dc=company,dc=com",
"search_filter": "(&(objectClass=inetOrgPerson)(uid={user_name}))"
}
}
}
]
}
check_config(entries=entries, valid=True)
@TestScenario
@Tags("config")
@Requirements(
# FIXME
)
def config_valid_tls_connection(self):
"""Check valid config with valid user_dn_detection section when
using LDAP that is configured to use TLS connection.
"""
with Given("I define LDAP server configuration"):
entries = {
"ldap_servers": [
{
"openldap2": {
"host": "openldap2",
"port": "636",
"enable_tls": "yes",
"bind_dn": "cn={user_name},ou=users,dc=company,dc=com",
"tls_require_cert": "never",
"user_dn_detection": {
"base_dn": "ou=users,dc=company,dc=com",
"search_filter": "(&(objectClass=inetOrgPerson)(uid={user_name}))"
}
}
}
]
}
check_config(entries=entries, valid=True, ldap_server="openldap2", user="user2", password="user2")
@TestOutline(Scenario)
@Examples("scope base_dn", [
("base", "cn=user1,ou=users,dc=company,dc=com"),
("one_level","ou=users,dc=company,dc=com"),
("children","ou=users,dc=company,dc=com"),
("subtree","ou=users,dc=company,dc=com") # default value
])
def check_valid_scope_values(self, scope, base_dn):
"""Check configuration with valid scope values.
"""
with Given("I define LDAP server configuration"):
entries = {
"ldap_servers": [
{
"openldap1": {
"host": "openldap1",
"port": "389",
"enable_tls": "no",
"bind_dn": "cn={user_name},ou=users,dc=company,dc=com",
"user_dn_detection": {
"base_dn": base_dn,
"search_filter": "(&(objectClass=inetOrgPerson)(uid={user_name}))",
"scope": scope
}
}
}
]
}
check_config(entries=entries, valid=True)
@TestSuite
def mapping(self):
"""Run all role mapping tests with both
openldap1 and openldap2 configured to use
user DN detection.
"""
users = [
{"server": "openldap1", "username": "user1", "password": "user1", "login": True,
"dn": "cn=user1,ou=users,dc=company,dc=com"},
]
entries = {
"ldap_servers": [
{
"openldap1": {
"host": "openldap1",
"port": "389",
"enable_tls": "no",
"bind_dn": "cn={user_name},ou=users,dc=company,dc=com",
"user_dn_detection": {
"base_dn": "ou=users,dc=company,dc=com",
"search_filter": "(&(objectClass=inetOrgPerson)(uid={user_name}))"
}
},
"openldap2": {
"host": "openldap2",
"port": "636",
"enable_tls": "yes",
"bind_dn": "cn={user_name},ou=users,dc=company,dc=com",
"tls_require_cert": "never",
"user_dn_detection": {
"base_dn": "ou=users,dc=company,dc=com",
"search_filter": "(&(objectClass=inetOrgPerson)(uid={user_name}))"
}
}
},
]
}
with Given("I add LDAP servers configuration"):
config = create_xml_config_content(entries=entries, config_file="ldap_servers.xml")
add_ldap_servers_configuration(servers=None, config=config)
for scenario in loads(importlib.import_module("ldap.role_mapping.tests.mapping"), Scenario):
scenario(ldap_server="openldap1", ldap_user=users[0])
@TestOutline
def setup_different_bind_dn_and_user_dn(self, uid, map_by, user_dn_detection):
"""Check that roles get mapped properly when bind_dn and user_dn are different
by creating LDAP users that have switched uid parameter values.
"""
with Given("I define LDAP server configuration"):
entries = {
"ldap_servers": [
{
"openldap1": {
"host": "openldap1",
"port": "389",
"enable_tls": "no",
"bind_dn": "cn={user_name},ou=users,dc=company,dc=com",
}
}
]
}
if user_dn_detection:
with And("I enable user dn detection"):
entries["ldap_servers"][0]["openldap1"]["user_dn_detection"] = {
"base_dn": "ou=users,dc=company,dc=com",
"search_filter": "(&(objectClass=inetOrgPerson)(uid={user_name}))",
"scope": "subtree"
}
with And("I define role mappings"):
role_mappings = [
{
"base_dn": "ou=groups,dc=company,dc=com",
"attribute": "cn",
"search_filter": f"(&(objectClass=groupOfUniqueNames)(uniquemember={{{map_by}}}))",
"prefix":""
}
]
with Given("I add LDAP users"):
first_user = add_ldap_users(users=[
{"cn": f"first_user", "userpassword": "user", "uid": "second_user"}
])[0]
second_user = add_ldap_users(users=[
{"cn": f"second_user", "userpassword": "user", "uid": "first_user"}
])[0]
with Given("I add LDAP groups"):
groups = add_ldap_groups(groups=({"cn": f"role0_{uid}"}, {"cn": f"role1_{uid}"}))
with And("I add LDAP user to each LDAP group"):
with By("adding first group to first user"):
add_user_to_group_in_ldap(user=first_user, group=groups[0])
with And("adding second group to second user"):
add_user_to_group_in_ldap(user=second_user, group=groups[1])
with And("I add RBAC roles"):
roles = add_rbac_roles(roles=(f"role0_{uid}", f"role1_{uid}"))
with Given("I add LDAP server configuration"):
config = create_xml_config_content(entries=entries, config_file="ldap_servers.xml")
add_ldap_servers_configuration(servers=None, config=config)
with And("I add LDAP external user directory configuration"):
add_ldap_external_user_directory(server=self.context.ldap_node.name,
role_mappings=role_mappings, restart=True)
@TestScenario
@Requirements(
# FIXME:
)
def map_roles_by_user_dn_when_base_dn_and_user_dn_are_different(self):
"""Check the case when we map roles using user_dn then
the first user has uid of second user and second user
has uid of first user and configuring user DN detection to
determine user_dn based on the uid value so that user_dn
for the first user will be bind_dn of the second user and
vice versa.
"""
uid = getuid()
setup_different_bind_dn_and_user_dn(uid=uid, map_by="user_dn", user_dn_detection=True)
with When(f"I login as first LDAP user"):
r = self.context.node.query(f"SHOW GRANTS", settings=[
("user", "first_user"), ("password", "user")])
with Then("I expect the first user to have mapped LDAP roles from second user"):
assert f"GRANT role1_{uid} TO first_user" in r.output, error()
with When(f"I login as second LDAP user"):
r = self.context.node.query(f"SHOW GRANTS", settings=[
("user", "second_user"), ("password", "user")])
with Then("I expect the second user to have mapped LDAP roles from first user"):
assert f"GRANT role0_{uid} TO second_user" in r.output, error()
@TestScenario
@Requirements(
# FIXME:
)
def map_roles_by_bind_dn_when_base_dn_and_user_dn_are_different(self):
"""Check the case when we map roles by bind_dn when bind_dn and user_dn
are different.
"""
uid = getuid()
setup_different_bind_dn_and_user_dn(uid=uid, map_by="bind_dn", user_dn_detection=True)
with When(f"I login as first LDAP user"):
r = self.context.node.query(f"SHOW GRANTS", settings=[
("user", "first_user"), ("password", "user")])
with Then("I expect the first user to have no mapped LDAP roles"):
assert f"GRANT role0_{uid} TO first_user" == r.output, error()
with When(f"I login as second LDAP user"):
r = self.context.node.query(f"SHOW GRANTS", settings=[
("user", "second_user"), ("password", "user")])
with Then("I expect the second user to have no mapped LDAP roles"):
assert f"GRANT role1_{uid} TO second_user" in r.output, error()
@TestFeature
@Name("user dn detection")
@Requirements(
#RQ_SRS_014_LDAP_UserDNDetection("1.0")
)
def feature(self):
"""Check LDAP user DN detection.
"""
self.context.node = self.context.cluster.node("clickhouse1")
self.context.ldap_node = self.context.cluster.node("openldap1")
with Given("I fix LDAP access permissions"):
fix_ldap_permissions(node=self.context.cluster.node("openldap1"))
fix_ldap_permissions(node=self.context.cluster.node("openldap2"))
for scenario in ordered(loads(current_module(), Scenario)):
scenario()
Suite(run=mapping)

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python3
import os
import sys
from testflows.core import *
@ -7,7 +8,7 @@ append_path(sys.path, "..")
from helpers.cluster import Cluster
from helpers.argparser import argparser
from map_type.requirements import SRS018_ClickHouse_Map_Data_Type
from map_type.requirements import SRS018_ClickHouse_Map_Data_Type
xfails = {
"tests/table map with key integer/Int:":
@ -65,7 +66,7 @@ xfails = {
"tests/select map with key integer/toNullable(NULL)":
[(Fail, "Nullable type as key not supported")],
"tests/select map with key string/Nullable":
[(Fail, "Nullable type as key not supported")],
[(Fail, "Nullable type as key not supported")],
"tests/select map with key string/Nullable(NULL)":
[(Fail, "Nullable type as key not supported")],
"tests/table map queries/select map with nullable value":
@ -104,11 +105,14 @@ xflags = {
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
"""Map type regression.
"""
top().terminating = False
nodes = {
"clickhouse":
("clickhouse1", "clickhouse2", "clickhouse3")
}
with Cluster(local, clickhouse_binary_path, nodes=nodes) as cluster:
with Cluster(local, clickhouse_binary_path, nodes=nodes,
docker_compose_project_dir=os.path.join(current_dir(), "map_type_env")) as cluster:
self.context.cluster = cluster
self.context.stress = stress

View File

@ -1,99 +1,16 @@
import uuid
import testflows.settings as settings
from contextlib import contextmanager
from multiprocessing.dummy import Pool
from multiprocessing import TimeoutError as PoolTaskTimeoutError
from testflows.core.name import basename, parentname
from testflows._core.testtype import TestSubType
from testflows.core import *
from helpers.common import Pool, join, run_scenario, instrument_clickhouse_server_log
from rbac.helper.tables import table_types
@TestStep(Given)
def instrument_clickhouse_server_log(self, node=None, clickhouse_server_log="/var/log/clickhouse-server/clickhouse-server.log"):
"""Instrument clickhouse-server.log for the current test
by adding start and end messages that include
current test name to the clickhouse-server.log of the specified node and
if the test fails then dump the messages from
the clickhouse-server.log for this test.
"""
if node is None:
node = self.context.node
with By("getting current log size"):
cmd = node.command(f"stat --format=%s {clickhouse_server_log}")
logsize = cmd.output.split(" ")[0].strip()
try:
with And("adding test name start message to the clickhouse-server.log"):
node.command(f"echo -e \"\\n-- start: {current().name} --\\n\" >> {clickhouse_server_log}")
with And("dump memory info"):
node.command(f"echo -e \"\\n-- {current().name} -- top --\\n\" && top -bn1")
node.command(f"echo -e \"\\n-- {current().name} -- df --\\n\" && df -h")
node.command(f"echo -e \"\\n-- {current().name} -- free --\\n\" && free -mh")
yield
finally:
if self.context.cluster.terminating is True:
return
with Finally("adding test name end message to the clickhouse-server.log", flags=TE):
node.command(f"echo -e \"\\n-- end: {current().name} --\\n\" >> {clickhouse_server_log}")
with And("checking if test has failing result"):
if not self.parent.result:
with Then("dumping clickhouse-server.log for this test"):
node.command(f"tail -c +{logsize} {clickhouse_server_log}")
def join(tasks, polling_timeout=5):
"""Join all parallel tests.
"""
exc = None
while tasks:
try:
try:
tasks[0].get(timeout=polling_timeout)
tasks.pop(0)
except PoolTaskTimeoutError as e:
task = tasks.pop(0)
tasks.append(task)
continue
except KeyboardInterrupt as e:
current().context.cluster.terminating = True
continue
except Exception as e:
tasks.pop(0)
if exc is None:
exc = e
current().context.cluster.terminating = True
if exc is not None:
raise exc
def start(pool, tasks, scenario, kwargs=None):
"""Start parallel test.
"""
if kwargs is None:
kwargs = {}
task = pool.apply_async(scenario, [], kwargs)
tasks.append(task)
return task
def run_scenario(pool, tasks, scenario, kwargs=None):
if kwargs is None:
kwargs = {}
if current().context.parallel:
start(pool, tasks, scenario, kwargs)
else:
scenario(**kwargs)
def permutations(table_count=1):
return [*range((1 << table_count)-1)]

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python3
import os
import sys
from testflows.core import *
@ -155,11 +156,14 @@ xflags = {
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
"""RBAC regression.
"""
top().terminating = False
nodes = {
"clickhouse":
("clickhouse1", "clickhouse2", "clickhouse3")
}
with Cluster(local, clickhouse_binary_path, nodes=nodes) as cluster:
with Cluster(local, clickhouse_binary_path, nodes=nodes,
docker_compose_project_dir=os.path.join(current_dir(), "rbac_env")) as cluster:
self.context.cluster = cluster
self.context.stress = stress

View File

@ -699,8 +699,7 @@ def user_with_privileges_on_cluster(self, permutation, table_type, node=None):
@TestSuite
def scenario_parallelization(self, table_type, permutation):
pool = Pool(7)
try:
with Pool(7) as pool:
tasks = []
try:
for scenario in loads(current_module(), Scenario):
@ -708,8 +707,6 @@ def scenario_parallelization(self, table_type, permutation):
{"table_type": table_type, "permutation": permutation})
finally:
join(tasks)
finally:
pool.close()
@TestFeature
@Requirements(
@ -739,8 +736,7 @@ def feature(self, node="clickhouse1", stress=None, parallel=None):
continue
with Example(str(example)):
pool = Pool(10)
try:
with Pool(10) as pool:
tasks = []
try:
for permutation in permutations(table_type):
@ -750,5 +746,3 @@ def feature(self, node="clickhouse1", stress=None, parallel=None):
{"table_type": table_type, "permutation": permutation})
finally:
join(tasks)
finally:
pool.close()

View File

@ -297,13 +297,10 @@ def feature(self, node="clickhouse1", parallel=None, stress=None):
continue
with Example(str(example)):
pool = Pool(5)
try:
with Pool(5) as pool:
tasks = []
try:
for scenario in loads(current_module(), Scenario):
run_scenario(pool, tasks, Scenario(test=scenario, setup=instrument_clickhouse_server_log), {"table_type" : table_type})
finally:
join(tasks)
finally:
pool.close()

View File

@ -469,13 +469,10 @@ def feature(self, node="clickhouse1", stress=None, parallel=None):
continue
with Example(str(example)):
pool = Pool(5)
try:
with Pool(5) as pool:
tasks = []
try:
for scenario in loads(current_module(), Scenario):
run_scenario(pool, tasks, Scenario(test=scenario, setup=instrument_clickhouse_server_log), {"table_type" : table_type})
finally:
join(tasks)
finally:
pool.close()

View File

@ -1,5 +1,3 @@
from multiprocessing.dummy import Pool
from testflows.core import *
from testflows.asserts import error

View File

@ -171,16 +171,13 @@ def user_with_privileges_on_cluster(self, privilege, table_type, node=None):
def scenario_parallelization(self, table_type, privilege):
"""Runs all scenarios in parallel for a given privilege.
"""
pool = Pool(4)
try:
with Pool(4) as pool:
tasks = []
try:
for scenario in loads(current_module(), Scenario):
run_scenario(pool, tasks, Scenario(test=scenario), {"table_type": table_type, "privilege": privilege})
finally:
join(tasks)
finally:
pool.close()
@TestFeature
@Requirements(
@ -210,8 +207,7 @@ def feature(self, node="clickhouse1", stress=None, parallel=None):
continue
with Example(str(example)):
pool = Pool(4)
try:
with Pool(4) as pool:
tasks = []
try:
for alias in aliases:
@ -220,5 +216,3 @@ def feature(self, node="clickhouse1", stress=None, parallel=None):
{"table_type": table_type, "privilege": alias})
finally:
join(tasks)
finally:
pool.close()

View File

@ -273,13 +273,10 @@ def feature(self, node="clickhouse1", stress=None, parallel=None):
continue
with Example(str(example)):
pool = Pool(5)
try:
with Pool(5) as pool:
tasks = []
try:
for scenario in loads(current_module(), Scenario):
run_scenario(pool, tasks, Scenario(test=scenario, setup=instrument_clickhouse_server_log), {"table_type" : table_type})
finally:
join(tasks)
finally:
pool.close()

View File

@ -1,5 +1,3 @@
from multiprocessing.dummy import Pool
from testflows.core import *
from testflows.asserts import error

View File

@ -840,13 +840,9 @@ def feature(self, stress=None, parallel=None, node="clickhouse1"):
self.context.stress = parallel
tasks = []
pool = Pool(10)
try:
with Pool(10) as pool:
try:
for scenario in loads(current_module(), Scenario):
run_scenario(pool, tasks, scenario)
finally:
join(tasks)
finally:
pool.close()

View File

@ -661,8 +661,7 @@ def feature(self, node="clickhouse1", stress=None, parallel=None):
if stress is not None:
self.context.stress = stress
pool = Pool(20)
try:
with Pool(20) as pool:
tasks = []
try:
@ -686,5 +685,3 @@ def feature(self, node="clickhouse1", stress=None, parallel=None):
finally:
join(tasks)
finally:
pool.close()

View File

@ -1320,16 +1320,12 @@ def cluster_tests(self, cluster, node=None):
self.context.cluster_name = cluster
tasks = []
pool = Pool(3)
try:
with Pool(3) as pool:
try:
for suite in loads(current_module(), Suite):
run_scenario(pool, tasks, Suite(test=suite))
finally:
join(tasks)
finally:
pool.close()
@TestFeature
@Requirements(
@ -1345,9 +1341,7 @@ def feature(self, node="clickhouse1"):
self.context.node3 = self.context.cluster.node("clickhouse3")
tasks = []
pool = Pool(3)
try:
with Pool(3) as pool:
try:
run_scenario(pool, tasks, Feature(test=cluster_tests))
run_scenario(pool, tasks, Scenario(test=local_user))
@ -1355,6 +1349,4 @@ def feature(self, node="clickhouse1"):
finally:
join(tasks)
finally:
pool.close()

View File

@ -7,9 +7,7 @@ from rbac.helper.common import *
def feature(self):
tasks = []
pool = Pool(10)
try:
with Pool(10) as pool:
try:
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.insert", "feature")), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.privileges.select", "feature"), ), {})
@ -96,7 +94,5 @@ def feature(self):
finally:
join(tasks)
finally:
pool.close()
Feature(test=load("rbac.tests.privileges.system.shutdown", "feature"))

View File

@ -1,5 +1,3 @@
from multiprocessing.dummy import Pool
from testflows.core import *
from testflows.asserts import error
@ -89,7 +87,7 @@ def grant_option_check(grant_option_target, grant_target, user_name, table_type,
@Examples("privilege", [
("ALTER MOVE PARTITION",), ("ALTER MOVE PART",), ("MOVE PARTITION",), ("MOVE PART",),
("ALTER DELETE",), ("DELETE",),
("ALTER FETCH PARTITION",), ("ALTER FETCH PART",), ("FETCH PARTITION",),
("ALTER FETCH PARTITION",), ("FETCH PARTITION",),
("ALTER FREEZE PARTITION",), ("FREEZE PARTITION",),
("ALTER UPDATE",), ("UPDATE",),
("ALTER ADD COLUMN",), ("ADD COLUMN",),
@ -126,8 +124,7 @@ def feature(self, node="clickhouse1", stress=None, parallel=None):
if stress is not None:
self.context.stress = stress
pool = Pool(12)
try:
with Pool(12) as pool:
tasks = []
try:
for example in self.examples:
@ -135,5 +132,3 @@ def feature(self, node="clickhouse1", stress=None, parallel=None):
run_scenario(pool, tasks, Suite(test=grant_option, name=privilege, setup=instrument_clickhouse_server_log), {"table_type": "MergeTree", "privilege": privilege})
finally:
join(tasks)
finally:
pool.close()

View File

@ -500,13 +500,9 @@ def feature(self, table_type, parallel=None, stress=None, node="clickhouse1"):
self.context.stress = parallel
tasks = []
pool = Pool(10)
try:
with Pool(10) as pool:
try:
for scenario in loads(current_module(), Scenario):
run_scenario(pool, tasks, Scenario(test=scenario, setup=instrument_clickhouse_server_log), {"table_type" : table_type})
finally:
join(tasks)
finally:
pool.close()

View File

@ -419,13 +419,9 @@ def feature(self, table_type, parallel=None, stress=None, node="clickhouse1"):
self.context.stress = parallel
tasks = []
pool = Pool(10)
try:
with Pool(10) as pool:
try:
for scenario in loads(current_module(), Scenario):
run_scenario(pool, tasks, Scenario(test=scenario, setup=instrument_clickhouse_server_log), {"table_type" : table_type})
finally:
join(tasks)
finally:
pool.close()

View File

@ -166,10 +166,15 @@ def feature(self, node="clickhouse1"):
with Scenario("I revoke a role on fake cluster, throws exception", requirements=[
RQ_SRS_006_RBAC_Revoke_Role_Cluster("1.0")]):
with setup():
with When("I revoke a role from user on a cluster"):
exitcode, message = errors.cluster_not_found("fake_cluster")
node.query("REVOKE ON CLUSTER fake_cluster role0 FROM user0", exitcode=exitcode, message=message)
with Given("I have a role and a user on a cluster"):
node.query("CREATE USER OR REPLACE user0")
node.query("CREATE ROLE OR REPLACE role0")
with When("I revoke a role from user on a cluster"):
exitcode, message = errors.cluster_not_found("fake_cluster")
node.query("REVOKE ON CLUSTER fake_cluster role0 FROM user0", exitcode=exitcode, message=message)
with Finally("I drop the user and role"):
node.query("DROP USER IF EXISTS user0")
node.query("DROP ROLE IF EXISTS role0")
with Scenario("I revoke multiple roles from multiple users on cluster", requirements=[
RQ_SRS_006_RBAC_Revoke_Role("1.0"),
@ -196,4 +201,4 @@ def feature(self, node="clickhouse1"):
RQ_SRS_006_RBAC_Revoke_AdminOption("1.0")]):
with setup():
with When("I revoke admin option for multiple roles from multiple users"):
node.query("REVOKE ADMIN OPTION FOR role0, role1 FROM user0, user1")
node.query("REVOKE ADMIN OPTION FOR role0, role1 FROM user0, user1")

View File

@ -7,14 +7,10 @@ from rbac.helper.common import *
def feature(self):
tasks = []
pool = Pool(3)
try:
with Pool(3) as pool:
try:
run_scenario(pool, tasks, Feature(test=load("rbac.tests.views.view", "feature")), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.views.live_view", "feature")), {})
run_scenario(pool, tasks, Feature(test=load("rbac.tests.views.materialized_view", "feature")), {})
finally:
join(tasks)
finally:
pool.close()

View File

@ -1130,16 +1130,11 @@ def feature(self, stress=None, parallel=None, node="clickhouse1"):
if parallel is not None:
self.context.stress = parallel
tasks = []
pool = Pool(3)
with allow_experimental_live_view(self.context.node):
try:
tasks = []
with Pool(3) as pool:
try:
for suite in loads(current_module(), Suite):
run_scenario(pool, tasks, suite)
finally:
join(tasks)
finally:
pool.close()

View File

@ -2268,13 +2268,9 @@ def feature(self, stress=None, parallel=None, node="clickhouse1"):
self.context.stress = parallel
tasks = []
pool = Pool(3)
try:
with Pool(3) as pool:
try:
for suite in loads(current_module(), Suite):
run_scenario(pool, tasks, suite)
finally:
join(tasks)
finally:
pool.close()

View File

@ -1150,13 +1150,9 @@ def feature(self, stress=None, parallel=None, node="clickhouse1"):
self.context.stress = parallel
tasks = []
pool = Pool(3)
try:
with Pool(3) as pool:
try:
for suite in loads(current_module(), Suite):
run_scenario(pool, tasks, suite)
finally:
join(tasks)
finally:
pool.close()

View File

@ -4,6 +4,7 @@ from testflows.core import *
append_path(sys.path, ".")
from helpers.common import Pool, join, run_scenario
from helpers.argparser import argparser
@TestModule
@ -12,15 +13,20 @@ from helpers.argparser import argparser
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
"""ClickHouse regression.
"""
top().terminating = False
args = {"local": local, "clickhouse_binary_path": clickhouse_binary_path, "stress": stress, "parallel": parallel}
# Feature(test=load("example.regression", "regression"))(**args)
# Feature(test=load("ldap.regression", "regression"))(**args)
# Feature(test=load("rbac.regression", "regression"))(**args)
# Feature(test=load("aes_encryption.regression", "regression"))(**args)
Feature(test=load("map_type.regression", "regression"))(**args)
Feature(test=load("window_functions.regression", "regression"))(**args)
# Feature(test=load("kerberos.regression", "regression"))(**args)
tasks = []
with Pool(7) as pool:
try:
# run_scenario(pool, tasks, Feature(test=load("example.regression", "regression")), args)
# run_scenario(pool, tasks, Feature(test=load("ldap.regression", "regression")), args)
# run_scenario(pool, tasks, Feature(test=load("rbac.regression", "regression")), args)
# run_scenario(pool, tasks, Feature(test=load("aes_encryption.regression", "regression")), args)
run_scenario(pool, tasks, Feature(test=load("map_type.regression", "regression")), args)
run_scenario(pool, tasks, Feature(test=load("window_functions.regression", "regression")), args)
# run_scenario(pool, tasks, Feature(test=load("kerberos.regression", "regression")), args)
finally:
join(tasks)
if main():
regression()

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python3
import os
import sys
from testflows.core import *
@ -79,11 +80,14 @@ xflags = {
def regression(self, local, clickhouse_binary_path, stress=None, parallel=None):
"""Window functions regression.
"""
top().terminating = False
nodes = {
"clickhouse":
("clickhouse1", "clickhouse2", "clickhouse3")
}
with Cluster(local, clickhouse_binary_path, nodes=nodes) as cluster:
with Cluster(local, clickhouse_binary_path, nodes=nodes,
docker_compose_project_dir=os.path.join(current_dir(), "window_functions_env")) as cluster:
self.context.cluster = cluster
self.context.stress = stress