Updated test_ssl_cert_authentication to use urllib3

This commit is contained in:
Smita Kulkarni 2023-05-18 09:20:38 +02:00
parent 3d26232cc0
commit 2893f14c54

View File

@ -2,10 +2,10 @@ import pytest
from helpers.client import Client from helpers.client import Client
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.ssl_context import WrapSSLContextWithSNI from helpers.ssl_context import WrapSSLContextWithSNI
import urllib.request, urllib.parse
import ssl import ssl
import os.path import os.path
from os import remove from os import remove
import urllib3
# The test cluster is configured with certificate for that host name, see 'server-ext.cnf'. # The test cluster is configured with certificate for that host name, see 'server-ext.cnf'.
@ -164,24 +164,22 @@ def get_ssl_context(cert_name):
def execute_query_https( def execute_query_https(
query, user, enable_ssl_auth=True, cert_name=None, password=None query, user, enable_ssl_auth=True, cert_name=None, password=None
): ):
retries = 10 try:
while True: url = f"https://{instance.ip_address}:{HTTPS_PORT}/?query={query}"
try: headers = {"X-ClickHouse-User":user}
url = f"https://{instance.ip_address}:{HTTPS_PORT}/?query={urllib.parse.quote(query)}" if enable_ssl_auth:
request = urllib.request.Request(url) headers["X-ClickHouse-SSL-Certificate-Auth"] = "on"
request.add_header("X-ClickHouse-User", user) if password:
if enable_ssl_auth: headers["X-ClickHouse-Key"] = password
request.add_header("X-ClickHouse-SSL-Certificate-Auth", "on") http_client=urllib3.PoolManager(ssl_context=get_ssl_context(cert_name))
if password: response = http_client.request('GET', url, headers=headers)
request.add_header("X-ClickHouse-Key", password) if response.status != 200:
response = urllib.request.urlopen( raise Exception(response.status)
request, context=get_ssl_context(cert_name) return response.data.decode("utf-8")
).read() except:
return response.decode("utf-8") raise
except BrokenPipeError:
retries -= 1
if retries == 0:
raise
def test_https(): def test_https():
@ -203,7 +201,7 @@ def test_https_wrong_cert():
# Wrong certificate: different user's certificate # Wrong certificate: different user's certificate
with pytest.raises(Exception) as err: with pytest.raises(Exception) as err:
execute_query_https("SELECT currentUser()", user="john", cert_name="client2") execute_query_https("SELECT currentUser()", user="john", cert_name="client2")
assert "HTTP Error 403" in str(err.value) assert "403" in str(err.value)
# Wrong certificate: self-signed certificate. # Wrong certificate: self-signed certificate.
with pytest.raises(Exception) as err: with pytest.raises(Exception) as err:
@ -213,7 +211,7 @@ def test_https_wrong_cert():
# No certificate. # No certificate.
with pytest.raises(Exception) as err: with pytest.raises(Exception) as err:
execute_query_https("SELECT currentUser()", user="john") execute_query_https("SELECT currentUser()", user="john")
assert "HTTP Error 403" in str(err.value) assert "403" in str(err.value)
# No header enabling SSL authentication. # No header enabling SSL authentication.
with pytest.raises(Exception) as err: with pytest.raises(Exception) as err:
@ -341,7 +339,7 @@ def test_create_user():
with pytest.raises(Exception) as err: with pytest.raises(Exception) as err:
execute_query_https("SELECT currentUser()", user="emma", cert_name="client3") execute_query_https("SELECT currentUser()", user="emma", cert_name="client3")
assert "HTTP Error 403" in str(err.value) assert "403" in str(err.value)
assert ( assert (
instance.query("SHOW CREATE USER lucy") instance.query("SHOW CREATE USER lucy")