mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
121 lines
3.2 KiB
Python
121 lines
3.2 KiB
Python
import json
|
|
import logging
|
|
import os
|
|
import sys
|
|
|
|
import pytest
|
|
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
from .http_auth_server import GOOD_PASSWORD, USER_RESPONSES
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
instance = cluster.add_instance(
|
|
"node",
|
|
main_configs=["configs/config.xml"],
|
|
user_configs=["configs/users.xml"],
|
|
)
|
|
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
|
|
def run_echo_server():
|
|
tmp = instance.copy_file_to_container(
|
|
os.path.join(SCRIPT_DIR, "http_auth_server.py"),
|
|
"/http_auth_server.py",
|
|
)
|
|
|
|
tmp = instance.exec_in_container(
|
|
[
|
|
"bash",
|
|
"-c",
|
|
"python3 /http_auth_server.py > /http_auth_server.log 2>&1",
|
|
],
|
|
detach=True,
|
|
user="root",
|
|
)
|
|
|
|
for _ in range(0, 10):
|
|
ping_response = instance.exec_in_container(
|
|
["curl", "-s", f"http://localhost:8000/health"],
|
|
nothrow=True,
|
|
)
|
|
logging.debug(f"Reply1: {ping_response}")
|
|
if ping_response == "OK":
|
|
return
|
|
|
|
raise Exception("Echo server is not responding")
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
run_echo_server()
|
|
yield cluster
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_user_from_config_basic_auth_pass(started_cluster):
|
|
assert (
|
|
instance.query("SHOW CREATE USER good_user")
|
|
== "CREATE USER good_user IDENTIFIED WITH http SERVER \\'basic_server\\' SCHEME \\'BASIC\\' SETTINGS PROFILE `default`\n"
|
|
)
|
|
assert (
|
|
instance.query(
|
|
"SELECT currentUser()", user="good_user", password="good_password"
|
|
)
|
|
== "good_user\n"
|
|
)
|
|
|
|
|
|
def test_user_create_basic_auth_pass(started_cluster):
|
|
instance.query(
|
|
"CREATE USER basic_user IDENTIFIED WITH HTTP SERVER 'basic_server' SCHEME 'BASIC'"
|
|
)
|
|
|
|
assert (
|
|
instance.query("SHOW CREATE USER basic_user")
|
|
== "CREATE USER basic_user IDENTIFIED WITH http SERVER \\'basic_server\\' SCHEME \\'BASIC\\'\n"
|
|
)
|
|
assert (
|
|
instance.query(
|
|
"SELECT currentUser()", user="basic_user", password=GOOD_PASSWORD
|
|
)
|
|
== "basic_user\n"
|
|
)
|
|
|
|
instance.query("DROP USER basic_user")
|
|
|
|
|
|
def test_basic_auth_failed(started_cluster):
|
|
assert "good_user: Authentication failed" in instance.query_and_get_error(
|
|
"SELECT currentUser()", user="good_user", password="bad_password"
|
|
)
|
|
|
|
|
|
def test_session_settings_from_auth_response(started_cluster):
|
|
for user, response in USER_RESPONSES.items():
|
|
query_id = f"test_query_{user}"
|
|
assert (
|
|
instance.query(
|
|
"SELECT currentUser()",
|
|
user=user,
|
|
password="good_password",
|
|
query_id=query_id,
|
|
)
|
|
== f"{user}\n"
|
|
)
|
|
instance.query("SYSTEM FLUSH LOGS")
|
|
|
|
res = instance.query(
|
|
f"select Settings from system.query_log where type = 'QueryFinish' and query_id = '{query_id}' FORMAT JSON"
|
|
)
|
|
|
|
res = json.loads(res)
|
|
query_settings = res["data"][0]["Settings"]
|
|
|
|
if isinstance(response, dict):
|
|
for key, value in response.get("settings", {}).items():
|
|
assert query_settings.get(key) == value
|