mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 11:32:03 +00:00
0ce30ab6d5
Added new type of authentication based on SSH keys. It works only for Native TCP protocol. Co-authored-by: Nikita Mikhaylov <nikitamikhaylov@clickhouse.com> Co-authored-by: Robert Schulze <robert@clickhouse.com>
104 lines
2.3 KiB
Python
104 lines
2.3 KiB
Python
import pytest
|
|
import os
|
|
from helpers.cluster import ClickHouseCluster
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
instance = cluster.add_instance(
|
|
"node",
|
|
user_configs=["configs/users.xml"],
|
|
)
|
|
|
|
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
|
|
|
|
|
@pytest.fixture(scope="module", autouse=True)
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
yield cluster
|
|
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_ecdsa():
|
|
assert (
|
|
instance.query(
|
|
"SELECT currentUser()",
|
|
user="john",
|
|
settings={
|
|
"ssh-key-file": f"{SCRIPT_DIR}/keys/ecdsa",
|
|
"ssh-key-passphrase": "",
|
|
},
|
|
)
|
|
== "john\n"
|
|
)
|
|
|
|
|
|
def test_ed25519():
|
|
assert (
|
|
instance.query(
|
|
"SELECT currentUser()",
|
|
user="john",
|
|
settings={
|
|
"ssh-key-file": f"{SCRIPT_DIR}/keys/ed25519",
|
|
"ssh-key-passphrase": "",
|
|
},
|
|
)
|
|
== "john\n"
|
|
)
|
|
|
|
|
|
def test_rsa():
|
|
assert (
|
|
instance.query(
|
|
"SELECT currentUser()",
|
|
user="john",
|
|
settings={
|
|
"ssh-key-file": f"{SCRIPT_DIR}/keys/rsa",
|
|
"ssh-key-passphrase": "",
|
|
},
|
|
)
|
|
== "john\n"
|
|
)
|
|
|
|
|
|
def test_wrong_key():
|
|
with pytest.raises(Exception) as err:
|
|
instance.query(
|
|
"SELECT currentUser()",
|
|
user="john",
|
|
settings={
|
|
"ssh-key-file": f"{SCRIPT_DIR}/keys/wrong",
|
|
"ssh-key-passphrase": "",
|
|
},
|
|
)
|
|
|
|
assert "Authentication failed" in str(err.value)
|
|
|
|
|
|
def test_key_with_passphrase():
|
|
assert (
|
|
instance.query(
|
|
"SELECT currentUser()",
|
|
user="lucy",
|
|
settings={
|
|
"ssh-key-file": f"{SCRIPT_DIR}/keys/passphrase",
|
|
"ssh-key-passphrase": "passphrase",
|
|
},
|
|
)
|
|
== "lucy\n"
|
|
)
|
|
|
|
|
|
def test_key_with_wrong_passphrase():
|
|
with pytest.raises(Exception):
|
|
instance.query(
|
|
"SELECT currentUser()",
|
|
user="lucy",
|
|
settings={
|
|
"ssh-key-file": f"{SCRIPT_DIR}/keys/passphrase",
|
|
"ssh-key-passphrase": "wrong",
|
|
},
|
|
) == "lucy\n"
|