Try to fix integration test 'test_ssl_cert_authentication'

This commit is contained in:
Nikolay Degterinsky 2023-05-07 18:34:34 +00:00
parent df8c930be3
commit c841a187ab

View File

@ -164,19 +164,26 @@ def get_ssl_context(cert_name):
def execute_query_https(
query, user, enable_ssl_auth=True, cert_name=None, password=None
):
url = (
f"https://{instance.ip_address}:{HTTPS_PORT}/?query={urllib.parse.quote(query)}"
)
request = urllib.request.Request(url)
request.add_header("X-ClickHouse-User", user)
if enable_ssl_auth:
request.add_header("X-ClickHouse-SSL-Certificate-Auth", "on")
if password:
request.add_header("X-ClickHouse-Key", password)
response = urllib.request.urlopen(
request, context=get_ssl_context(cert_name)
).read()
return response.decode("utf-8")
retries = 10
while True:
try:
url = (
f"https://{instance.ip_address}:{HTTPS_PORT}/?query={urllib.parse.quote(query)}"
)
request = urllib.request.Request(url)
request.add_header("X-ClickHouse-User", user)
if enable_ssl_auth:
request.add_header("X-ClickHouse-SSL-Certificate-Auth", "on")
if password:
request.add_header("X-ClickHouse-Key", password)
response = urllib.request.urlopen(
request, context=get_ssl_context(cert_name)
).read()
return response.decode("utf-8")
except BrokenPipeError:
retries -= 1
if retries == 0:
raise
def test_https():