Make test_ssl_cert_authentication similar to test_tlvs1_3

This commit is contained in:
János Benjamin Antal 2023-06-28 10:24:54 +00:00
parent e2241b8c7c
commit 451694d8b6

View File

@ -2,10 +2,10 @@ import pytest
from helpers.client import Client
from helpers.cluster import ClickHouseCluster
from helpers.ssl_context import WrapSSLContextWithSNI
import urllib.request, urllib.parse
import ssl
import os.path
from os import remove
import urllib3
# The test cluster is configured with certificate for that host name, see 'server-ext.cnf'.
@ -14,6 +14,7 @@ SSL_HOST = "integration-tests.clickhouse.com"
HTTPS_PORT = 8443
# It's important for the node to work at this IP because 'server-cert.pem' requires that (see server-ext.cnf).
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
MAX_RETRY = 5
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance(
@ -164,17 +165,19 @@ def get_ssl_context(cert_name):
def execute_query_https(
query, user, enable_ssl_auth=True, cert_name=None, password=None
):
url = f"https://{instance.ip_address}:{HTTPS_PORT}/?query={query}"
headers = {"X-ClickHouse-User": user}
url = (
f"https://{instance.ip_address}:{HTTPS_PORT}/?query={urllib.parse.quote(query)}"
)
request = urllib.request.Request(url)
request.add_header("X-ClickHouse-User", user)
if enable_ssl_auth:
headers["X-ClickHouse-SSL-Certificate-Auth"] = "on"
request.add_header("X-ClickHouse-SSL-Certificate-Auth", "on")
if password:
headers["X-ClickHouse-Key"] = password
http_client = urllib3.PoolManager(ssl_context=get_ssl_context(cert_name))
response = http_client.request("GET", url, headers=headers)
if response.status != 200:
raise Exception(response.status)
return response.data.decode("utf-8")
request.add_header("X-ClickHouse-Key", password)
response = urllib.request.urlopen(
request, context=get_ssl_context(cert_name)
).read()
return response.decode("utf-8")
def test_https():
@ -198,10 +201,18 @@ def test_https_wrong_cert():
execute_query_https("SELECT currentUser()", user="john", cert_name="client2")
assert "403" in str(err.value)
count = 0
# Wrong certificate: self-signed certificate.
with pytest.raises(Exception) as err:
execute_query_https("SELECT currentUser()", user="john", cert_name="wrong")
assert "unknown ca" in str(err.value)
while count <= MAX_RETRY:
with pytest.raises(Exception) as err:
execute_query_https("SELECT currentUser()", user="john", cert_name="wrong")
err_str = str(err.value)
if count < MAX_RETRY and "Broken pipe" in err_str:
count = count + 1
logging.warning(f"Failed attempt with wrong cert, err: {err_str}")
continue
assert "unknown ca" in err_str
break
# No certificate.
with pytest.raises(Exception) as err:
@ -291,24 +302,45 @@ def test_https_non_ssl_auth():
== "jane\n"
)
count = 0
# However if we send a certificate it must not be wrong.
with pytest.raises(Exception) as err:
execute_query_https(
"SELECT currentUser()",
user="peter",
enable_ssl_auth=False,
cert_name="wrong",
)
assert "unknown ca" in str(err.value)
with pytest.raises(Exception) as err:
execute_query_https(
"SELECT currentUser()",
user="jane",
enable_ssl_auth=False,
password="qwe123",
cert_name="wrong",
)
assert "unknown ca" in str(err.value)
while count <= MAX_RETRY:
with pytest.raises(Exception) as err:
execute_query_https(
"SELECT currentUser()",
user="peter",
enable_ssl_auth=False,
cert_name="wrong",
)
err_str = str(err.value)
if count < MAX_RETRY and "Broken pipe" in err_str:
count = count + 1
logging.warning(
f"Failed attempt with wrong cert, user: peter, err: {err_str}"
)
continue
assert "unknown ca" in err_str
break
count = 0
while count <= MAX_RETRY:
with pytest.raises(Exception) as err:
execute_query_https(
"SELECT currentUser()",
user="jane",
enable_ssl_auth=False,
password="qwe123",
cert_name="wrong",
)
err_str = str(err.value)
if count < MAX_RETRY and "Broken pipe" in err_str:
count = count + 1
logging.warning(
f"Failed attempt with wrong cert, user: jane, err: {err_str}"
)
continue
assert "unknown ca" in err_str
break
def test_create_user():