ClickHouse/tests/integration/test_ssl_cert_authentication/test.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

351 lines
10 KiB
Python
Raw Normal View History

import pytest
from helpers.client import Client
from helpers.cluster import ClickHouseCluster
from helpers.ssl_context import WrapSSLContextWithSNI
import ssl
import os.path
from os import remove
import urllib3
2022-08-09 10:52:16 +00:00
# The test cluster is configured with certificate for that host name, see 'server-ext.cnf'.
# The client have to verify server certificate against that name. Client uses SNI
SSL_HOST = "integration-tests.clickhouse.com"
HTTPS_PORT = 8443
2022-08-01 11:16:12 +00:00
# It's important for the node to work at this IP because 'server-cert.pem' requires that (see server-ext.cnf).
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
"node",
main_configs=[
"configs/ssl_config.xml",
"certs/server-key.pem",
"certs/server-cert.pem",
"certs/ca-cert.pem",
],
user_configs=["configs/users_with_ssl_auth.xml"],
)
@pytest.fixture(scope="module", autouse=True)
def started_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
config = """<clickhouse>
<openSSL>
<client>
<verificationMode>none</verificationMode>
<certificateFile>{certificateFile}</certificateFile>
<privateKeyFile>{privateKeyFile}</privateKeyFile>
<caConfig>{caConfig}</caConfig>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
</clickhouse>"""
2023-04-21 14:22:14 +00:00
def execute_query_native(node, query, user, cert_name, password=None):
config_path = f"{SCRIPT_DIR}/configs/client.xml"
formatted = config.format(
certificateFile=f"{SCRIPT_DIR}/certs/{cert_name}-cert.pem",
privateKeyFile=f"{SCRIPT_DIR}/certs/{cert_name}-key.pem",
caConfig=f"{SCRIPT_DIR}/certs/ca-cert.pem",
)
file = open(config_path, "w")
file.write(formatted)
file.close()
client = Client(
node.ip_address,
9440,
command=cluster.client_bin_path,
secure=True,
config=config_path,
)
try:
2023-04-21 14:22:14 +00:00
result = client.query(query, user=user, password=password)
remove(config_path)
return result
except:
remove(config_path)
raise
2023-04-21 14:22:14 +00:00
def test_native():
assert (
2023-04-21 14:22:14 +00:00
execute_query_native(
instance, "SELECT currentUser()", user="john", cert_name="client1"
)
== "john\n"
)
assert (
2023-04-21 14:22:14 +00:00
execute_query_native(
instance, "SELECT currentUser()", user="lucy", cert_name="client2"
)
== "lucy\n"
)
assert (
2023-04-21 14:22:14 +00:00
execute_query_native(
instance, "SELECT currentUser()", user="lucy", cert_name="client3"
)
== "lucy\n"
)
2023-04-21 14:22:14 +00:00
def test_native_wrong_cert():
# Wrong certificate: different user's certificate
with pytest.raises(Exception) as err:
execute_query_native(
instance, "SELECT currentUser()", user="john", cert_name="client2"
)
assert "AUTHENTICATION_FAILED" in str(err.value)
# Wrong certificate: self-signed certificate.
# In this case clickhouse-client itself will throw an error
with pytest.raises(Exception) as err:
execute_query_native(
instance, "SELECT currentUser()", user="john", cert_name="wrong"
)
assert "UNKNOWN_CA" in str(err.value)
def test_native_fallback_to_password():
# Unrelated certificate, correct password
assert (
execute_query_native(
2023-04-21 14:22:14 +00:00
instance,
"SELECT currentUser()",
user="jane",
cert_name="client2",
password="qwe123",
)
2023-04-21 14:22:14 +00:00
== "jane\n"
)
2023-04-21 14:22:14 +00:00
# Unrelated certificate, wrong password
with pytest.raises(Exception) as err:
execute_query_native(
2023-04-21 14:22:14 +00:00
instance,
"SELECT currentUser()",
user="jane",
cert_name="client2",
password="wrong",
)
assert "AUTHENTICATION_FAILED" in str(err.value)
def get_ssl_context(cert_name):
context = WrapSSLContextWithSNI(SSL_HOST, ssl.PROTOCOL_TLS_CLIENT)
context.load_verify_locations(cafile=f"{SCRIPT_DIR}/certs/ca-cert.pem")
if cert_name:
context.load_cert_chain(
f"{SCRIPT_DIR}/certs/{cert_name}-cert.pem",
f"{SCRIPT_DIR}/certs/{cert_name}-key.pem",
)
2023-04-21 14:22:14 +00:00
context.verify_mode = ssl.CERT_REQUIRED
context.check_hostname = True
return context
def execute_query_https(
query, user, enable_ssl_auth=True, cert_name=None, password=None
):
2023-05-22 08:20:45 +00:00
url = f"https://{instance.ip_address}:{HTTPS_PORT}/?query={query}"
2023-05-22 08:30:04 +00:00
headers = {"X-ClickHouse-User": user}
2023-05-22 08:20:45 +00:00
if enable_ssl_auth:
headers["X-ClickHouse-SSL-Certificate-Auth"] = "on"
if password:
headers["X-ClickHouse-Key"] = password
2023-05-22 08:30:04 +00:00
http_client = urllib3.PoolManager(ssl_context=get_ssl_context(cert_name))
response = http_client.request("GET", url, headers=headers)
2023-05-22 08:20:45 +00:00
if response.status != 200:
raise Exception(response.status)
return response.data.decode("utf-8")
2023-04-21 14:22:14 +00:00
def test_https():
assert (
execute_query_https("SELECT currentUser()", user="john", cert_name="client1")
== "john\n"
)
assert (
execute_query_https("SELECT currentUser()", user="lucy", cert_name="client2")
== "lucy\n"
)
assert (
2023-04-21 14:22:14 +00:00
execute_query_https("SELECT currentUser()", user="lucy", cert_name="client3")
== "lucy\n"
)
def test_https_wrong_cert():
# Wrong certificate: different user's certificate
with pytest.raises(Exception) as err:
execute_query_https("SELECT currentUser()", user="john", cert_name="client2")
assert "403" in str(err.value)
# Wrong certificate: self-signed certificate.
with pytest.raises(Exception) as err:
execute_query_https("SELECT currentUser()", user="john", cert_name="wrong")
assert "unknown ca" in str(err.value)
# No certificate.
with pytest.raises(Exception) as err:
execute_query_https("SELECT currentUser()", user="john")
assert "403" in str(err.value)
# No header enabling SSL authentication.
with pytest.raises(Exception) as err:
execute_query_https(
"SELECT currentUser()",
user="john",
enable_ssl_auth=False,
cert_name="client1",
)
def test_https_non_ssl_auth():
# Users with non-SSL authentication are allowed, in this case we can skip sending a client certificate at all (because "verificationMode" is set to "relaxed").
# assert execute_query_https("SELECT currentUser()", user="peter", enable_ssl_auth=False) == "peter\n"
assert (
execute_query_https(
"SELECT currentUser()",
user="jane",
enable_ssl_auth=False,
password="qwe123",
)
== "jane\n"
)
# But we still can send a certificate if we want.
assert (
execute_query_https(
"SELECT currentUser()",
user="peter",
enable_ssl_auth=False,
cert_name="client1",
)
== "peter\n"
)
assert (
execute_query_https(
"SELECT currentUser()",
user="peter",
enable_ssl_auth=False,
cert_name="client2",
)
== "peter\n"
)
assert (
execute_query_https(
"SELECT currentUser()",
user="peter",
enable_ssl_auth=False,
cert_name="client3",
)
== "peter\n"
)
assert (
execute_query_https(
"SELECT currentUser()",
user="jane",
enable_ssl_auth=False,
password="qwe123",
cert_name="client1",
)
== "jane\n"
)
assert (
execute_query_https(
"SELECT currentUser()",
user="jane",
enable_ssl_auth=False,
password="qwe123",
cert_name="client2",
)
== "jane\n"
)
assert (
execute_query_https(
"SELECT currentUser()",
user="jane",
enable_ssl_auth=False,
password="qwe123",
cert_name="client3",
)
== "jane\n"
)
# However if we send a certificate it must not be wrong.
with pytest.raises(Exception) as err:
execute_query_https(
"SELECT currentUser()",
user="peter",
enable_ssl_auth=False,
cert_name="wrong",
)
assert "unknown ca" in str(err.value)
with pytest.raises(Exception) as err:
execute_query_https(
"SELECT currentUser()",
user="jane",
enable_ssl_auth=False,
password="qwe123",
cert_name="wrong",
)
assert "unknown ca" in str(err.value)
def test_create_user():
instance.query("CREATE USER emma IDENTIFIED WITH ssl_certificate CN 'client3'")
assert (
execute_query_https("SELECT currentUser()", user="emma", cert_name="client3")
== "emma\n"
)
assert (
instance.query("SHOW CREATE USER emma")
== "CREATE USER emma IDENTIFIED WITH ssl_certificate CN \\'client3\\'\n"
)
instance.query("ALTER USER emma IDENTIFIED WITH ssl_certificate CN 'client2'")
assert (
execute_query_https("SELECT currentUser()", user="emma", cert_name="client2")
== "emma\n"
)
assert (
instance.query("SHOW CREATE USER emma")
== "CREATE USER emma IDENTIFIED WITH ssl_certificate CN \\'client2\\'\n"
)
with pytest.raises(Exception) as err:
execute_query_https("SELECT currentUser()", user="emma", cert_name="client3")
assert "403" in str(err.value)
assert (
instance.query("SHOW CREATE USER lucy")
== "CREATE USER lucy IDENTIFIED WITH ssl_certificate CN \\'client2\\', \\'client3\\'\n"
)
assert (
instance.query(
"SELECT name, auth_type, auth_params FROM system.users WHERE name IN ['emma', 'lucy'] ORDER BY name"
)
== 'emma\tssl_certificate\t{"common_names":["client2"]}\n'
'lucy\tssl_certificate\t{"common_names":["client2","client3"]}\n'
)