mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
135 lines
4.0 KiB
Python
135 lines
4.0 KiB
Python
#!/usr/bin/env python3
|
|
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import signal
|
|
import subprocess
|
|
import tempfile
|
|
|
|
|
|
class SSHAgent:
|
|
def __init__(self):
|
|
self._env = {}
|
|
self._env_backup = {}
|
|
self._keys = {}
|
|
self.start()
|
|
|
|
@property
|
|
def pid(self):
|
|
return int(self._env["SSH_AGENT_PID"])
|
|
|
|
def start(self):
|
|
if shutil.which("ssh-agent") is None:
|
|
raise RuntimeError("ssh-agent binary is not available")
|
|
|
|
self._env_backup["SSH_AUTH_SOCK"] = os.environ.get("SSH_AUTH_SOCK")
|
|
self._env_backup["SSH_OPTIONS"] = os.environ.get("SSH_OPTIONS")
|
|
|
|
# set ENV from stdout of ssh-agent
|
|
for line in self._run(["ssh-agent"]).splitlines():
|
|
name, _, value = line.partition(b"=")
|
|
if _ == b"=":
|
|
value = value.split(b";", 1)[0]
|
|
self._env[name.decode()] = value.decode()
|
|
os.environ[name.decode()] = value.decode()
|
|
|
|
ssh_options = (
|
|
"," + os.environ["SSH_OPTIONS"] if os.environ.get("SSH_OPTIONS") else ""
|
|
)
|
|
os.environ[
|
|
"SSH_OPTIONS"
|
|
] = f"{ssh_options}UserKnownHostsFile=/dev/null,StrictHostKeyChecking=no"
|
|
|
|
def add(self, key):
|
|
key_pub = self._key_pub(key)
|
|
|
|
if key_pub in self._keys:
|
|
self._keys[key_pub] += 1
|
|
else:
|
|
self._run(["ssh-add", "-"], stdin=key.encode())
|
|
self._keys[key_pub] = 1
|
|
|
|
return key_pub
|
|
|
|
def remove(self, key_pub):
|
|
if key_pub not in self._keys:
|
|
raise ValueError(f"Private key not found, public part: {key_pub}")
|
|
|
|
if self._keys[key_pub] > 1:
|
|
self._keys[key_pub] -= 1
|
|
else:
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
f.write(key_pub)
|
|
f.flush()
|
|
self._run(["ssh-add", "-d", f.name])
|
|
self._keys.pop(key_pub)
|
|
|
|
def print_keys(self):
|
|
keys = self._run(["ssh-add", "-l"]).splitlines()
|
|
if keys:
|
|
logging.info("ssh-agent keys:")
|
|
for key in keys:
|
|
logging.info("%s", key)
|
|
else:
|
|
logging.info("ssh-agent (pid %d) is empty", self.pid)
|
|
|
|
def kill(self):
|
|
for k, v in self._env.items():
|
|
os.environ.pop(k, None)
|
|
|
|
for k, v in self._env_backup.items():
|
|
if v is not None:
|
|
os.environ[k] = v
|
|
|
|
os.kill(self.pid, signal.SIGTERM)
|
|
|
|
def _key_pub(self, key):
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
f.write(key.encode())
|
|
f.flush()
|
|
return self._run(["ssh-keygen", "-y", "-f", f.name])
|
|
|
|
@staticmethod
|
|
def _run(cmd, stdin=None):
|
|
shell = isinstance(cmd, str)
|
|
with subprocess.Popen(
|
|
cmd,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
stdin=subprocess.PIPE if stdin else None,
|
|
shell=shell,
|
|
) as p:
|
|
stdout, stderr = p.communicate(stdin)
|
|
|
|
if stdout.strip().decode() == "The agent has no identities.":
|
|
return ""
|
|
|
|
if p.returncode:
|
|
message = stderr.strip() + b"\n" + stdout.strip()
|
|
raise RuntimeError(message.strip().decode())
|
|
|
|
return stdout
|
|
|
|
|
|
class SSHKey:
|
|
def __init__(self, key_name=None, key_value=None):
|
|
if key_name is None and key_value is None:
|
|
raise ValueError("Either key_name or key_value must be specified")
|
|
if key_name is not None and key_value is not None:
|
|
raise ValueError("key_name or key_value must be specified")
|
|
if key_name is not None:
|
|
self.key = os.getenv(key_name)
|
|
else:
|
|
self.key = key_value
|
|
self._key_pub = None
|
|
self._ssh_agent = SSHAgent()
|
|
|
|
def __enter__(self):
|
|
self._key_pub = self._ssh_agent.add(self.key)
|
|
self._ssh_agent.print_keys()
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self._ssh_agent.remove(self._key_pub)
|
|
self._ssh_agent.print_keys()
|