# -*- coding: utf-8 -*- import io import gzip import subprocess import time from tempfile import NamedTemporaryFile import requests import requests_kerberos as reqkerb import socket import tempfile import logging import os class mk_krb_conf(object): def __init__(self, krb_conf, kdc_ip): self.krb_conf = krb_conf self.kdc_ip = kdc_ip self.amended_krb_conf = None def __enter__(self): with open(self.krb_conf) as f: content = f.read() amended_content = content.replace("hdfskerberos", self.kdc_ip) self.amended_krb_conf = tempfile.NamedTemporaryFile(delete=False, mode="w+") self.amended_krb_conf.write(amended_content) self.amended_krb_conf.close() return self.amended_krb_conf.name def __exit__(self, type, value, traceback): if self.amended_krb_conf is not None: self.amended_krb_conf.close() class HDFSApi(object): def __init__( self, user, host, proxy_port, data_port, timeout=100, kerberized=False, principal=None, keytab=None, krb_conf=None, protocol="http", hdfs_ip=None, kdc_ip=None, ): self.host = host self.protocol = protocol self.proxy_port = proxy_port self.data_port = data_port self.user = user self.kerberized = kerberized self.principal = principal self.keytab = keytab self.timeout = timeout self.hdfs_ip = hdfs_ip self.kdc_ip = kdc_ip self.krb_conf = krb_conf # logging.basicConfig(level=logging.DEBUG) # logging.getLogger().setLevel(logging.DEBUG) # requests_log = logging.getLogger("requests.packages.urllib3") # requests_log.setLevel(logging.INFO) # requests_log.propagate = True # kerb_log = logging.getLogger("requests_kerberos") # kerb_log.setLevel(logging.DEBUG) # kerb_log.propagate = True if kerberized: self._run_kinit() self.kerberos_auth = reqkerb.HTTPKerberosAuth( mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal, ) if self.kerberos_auth is None: print("failed to obtain kerberos_auth") else: self.kerberos_auth = None def _run_kinit(self): if self.principal is None or self.keytab is None: raise Exception("kerberos principal and keytab are required") with mk_krb_conf(self.krb_conf, self.kdc_ip) as instantiated_krb_conf: logging.debug("instantiated_krb_conf {}".format(instantiated_krb_conf)) os.environ["KRB5_CONFIG"] = instantiated_krb_conf cmd = "(kinit -R -t {keytab} -k {principal} || (sleep 5 && kinit -R -t {keytab} -k {principal})) ; klist".format( instantiated_krb_conf=instantiated_krb_conf, keytab=self.keytab, principal=self.principal, ) start = time.time() while time.time() - start < self.timeout: try: res = subprocess.run(cmd, shell=True) if res.returncode != 0: # check_call(...) from subprocess does not print stderr, so we do it manually logging.debug( "Stderr:\n{}\n".format(res.stderr.decode("utf-8")) ) logging.debug( "Stdout:\n{}\n".format(res.stdout.decode("utf-8")) ) logging.debug("Env:\n{}\n".format(env)) raise Exception( "Command {} return non-zero code {}: {}".format( args, res.returncode, res.stderr.decode("utf-8") ) ) logging.debug("KDC started, kinit successfully run") return except Exception as ex: logging.debug("Can't run kinit ... waiting {}".format(str(ex))) time.sleep(1) raise Exception("Kinit running failure") @staticmethod def req_wrapper(func, expected_code, cnt=2, **kwargs): for i in range(0, cnt): logging.debug(f"CALL: {str(kwargs)}") response_data = func(**kwargs) logging.debug( f"response_data:{response_data.content} headers:{response_data.headers}" ) if response_data.status_code == expected_code: return response_data else: logging.error( f"unexpected response_data.status_code {response_data.status_code} != {expected_code}" ) time.sleep(1) response_data.raise_for_status() def read_data(self, path, universal_newlines=True): logging.debug( "read_data protocol:{} host:{} ip:{} proxy port:{} data port:{} path: {}".format( self.protocol, self.host, self.hdfs_ip, self.proxy_port, self.data_port, path, ) ) response = self.req_wrapper( requests.get, 307, url="{protocol}://{ip}:{port}/webhdfs/v1{path}?op=OPEN".format( protocol=self.protocol, ip=self.hdfs_ip, port=self.proxy_port, path=path ), headers={"host": str(self.hdfs_ip)}, allow_redirects=False, verify=False, auth=self.kerberos_auth, ) # additional_params = '&'.join(response.headers['Location'].split('&')[1:2]) location = None if self.kerberized: location = response.headers["Location"].replace( "kerberizedhdfs1:1006", "{}:{}".format(self.hdfs_ip, self.data_port) ) else: location = response.headers["Location"].replace( "hdfs1:50075", "{}:{}".format(self.hdfs_ip, self.data_port) ) logging.debug("redirected to {}".format(location)) response_data = self.req_wrapper( requests.get, 200, url=location, headers={"host": self.hdfs_ip}, verify=False, auth=self.kerberos_auth, ) if universal_newlines: return response_data.text else: return response_data.content def write_data(self, path, content): logging.debug( "write_data protocol:{} host:{} port:{} path: {} user:{}, principal:{}".format( self.protocol, self.host, self.proxy_port, path, self.user, self.principal, ) ) named_file = NamedTemporaryFile(mode="wb+") fpath = named_file.name if isinstance(content, str): content = content.encode() named_file.write(content) named_file.flush() response = self.req_wrapper( requests.put, 307, url="{protocol}://{ip}:{port}/webhdfs/v1{path}?op=CREATE".format( protocol=self.protocol, ip=self.hdfs_ip, port=self.proxy_port, path=path, user=self.user, ), allow_redirects=False, headers={"host": str(self.hdfs_ip)}, params={"overwrite": "true"}, verify=False, auth=self.kerberos_auth, ) logging.debug("HDFS api response:{}".format(response.headers)) # additional_params = '&'.join( # response.headers['Location'].split('&')[1:2] + ["user.name={}".format(self.user), "overwrite=true"]) if self.kerberized: location = response.headers["Location"].replace( "kerberizedhdfs1:1006", "{}:{}".format(self.hdfs_ip, self.data_port) ) else: location = response.headers["Location"].replace( "hdfs1:50075", "{}:{}".format(self.hdfs_ip, self.data_port) ) with open(fpath, mode="rb") as fh: file_data = fh.read() protocol = "http" # self.protocol response = self.req_wrapper( requests.put, 201, url="{location}".format(location=location), data=file_data, headers={"content-type": "text/plain", "host": str(self.hdfs_ip)}, params={"file": path, "user.name": self.user}, allow_redirects=False, verify=False, auth=self.kerberos_auth, ) logging.debug(f"{response.content} {response.headers}") def write_gzip_data(self, path, content): if isinstance(content, str): content = content.encode() out = io.BytesIO() with gzip.GzipFile(fileobj=out, mode="wb") as f: f.write(content) self.write_data(path, out.getvalue()) def read_gzip_data(self, path): return ( gzip.GzipFile( fileobj=io.BytesIO(self.read_data(path, universal_newlines=False)) ) .read() .decode() )