mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-12 01:12:12 +00:00
96 lines
3.0 KiB
Python
96 lines
3.0 KiB
Python
import logging
|
|
import pytest
|
|
from helpers.cluster import ClickHouseCluster
|
|
from helpers.test_tools import TSV
|
|
|
|
LDAP_ADMIN_BIND_DN = "cn=admin,dc=example,dc=org"
|
|
LDAP_ADMIN_PASSWORD = "clickhouse"
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
instance = cluster.add_instance(
|
|
"instance", main_configs=["configs/ldap_with_role_mapping.xml"], with_ldap=True
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def ldap_cluster():
|
|
try:
|
|
cluster.start()
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def add_ldap_group(ldap_cluster, group_cn, member_cn):
|
|
code, (stdout, stderr) = ldap_cluster.ldap_container.exec_run(
|
|
[
|
|
"sh",
|
|
"-c",
|
|
"""echo "dn: cn={group_cn},dc=example,dc=org
|
|
objectClass: top
|
|
objectClass: groupOfNames
|
|
member: cn={member_cn},ou=users,dc=example,dc=org" | \
|
|
ldapadd -H ldap://{host}:{port} -D "{admin_bind_dn}" -x -w {admin_password}
|
|
""".format(
|
|
host=ldap_cluster.ldap_host,
|
|
port=ldap_cluster.ldap_port,
|
|
admin_bind_dn=LDAP_ADMIN_BIND_DN,
|
|
admin_password=LDAP_ADMIN_PASSWORD,
|
|
group_cn=group_cn,
|
|
member_cn=member_cn,
|
|
),
|
|
],
|
|
demux=True,
|
|
)
|
|
logging.debug(
|
|
f"test_ldap_external_user_directory code:{code} stdout:{stdout}, stderr:{stderr}"
|
|
)
|
|
assert code == 0
|
|
|
|
|
|
def test_authentication_pass():
|
|
assert instance.query(
|
|
"select currentUser()", user="janedoe", password="qwerty"
|
|
) == TSV([["janedoe"]])
|
|
|
|
|
|
def test_authentication_fail():
|
|
# User doesn't exist.
|
|
assert "doesnotexist: Authentication failed" in instance.query_and_get_error(
|
|
"SELECT currentUser()", user="doesnotexist"
|
|
)
|
|
|
|
# Wrong password.
|
|
assert "janedoe: Authentication failed" in instance.query_and_get_error(
|
|
"SELECT currentUser()", user="janedoe", password="123"
|
|
)
|
|
|
|
|
|
def test_role_mapping(ldap_cluster):
|
|
instance.query("CREATE ROLE role_1")
|
|
instance.query("CREATE ROLE role_2")
|
|
add_ldap_group(ldap_cluster, group_cn="clickhouse-role_1", member_cn="johndoe")
|
|
add_ldap_group(ldap_cluster, group_cn="clickhouse-role_2", member_cn="johndoe")
|
|
|
|
assert instance.query(
|
|
"select currentUser()", user="johndoe", password="qwertz"
|
|
) == TSV([["johndoe"]])
|
|
|
|
assert instance.query(
|
|
"select role_name from system.current_roles ORDER BY role_name",
|
|
user="johndoe",
|
|
password="qwertz",
|
|
) == TSV([["role_1"], ["role_2"]])
|
|
|
|
instance.query("CREATE ROLE role_3")
|
|
add_ldap_group(ldap_cluster, group_cn="clickhouse-role_3", member_cn="johndoe")
|
|
# Check that non-existing role in ClickHouse is ignored during role update
|
|
# See https://github.com/ClickHouse/ClickHouse/issues/54318
|
|
add_ldap_group(ldap_cluster, group_cn="clickhouse-role_4", member_cn="johndoe")
|
|
|
|
assert instance.query(
|
|
"select role_name from system.current_roles ORDER BY role_name",
|
|
user="johndoe",
|
|
password="qwertz",
|
|
) == TSV([["role_1"], ["role_2"], ["role_3"]])
|