This commit is contained in:
Yatsishin Ilya 2021-02-19 17:42:43 +03:00
parent ee955038c1
commit d45cab4228
4 changed files with 77 additions and 31 deletions

View File

@ -8,13 +8,16 @@ services:
hostname: kerberizedhdfs1
restart: always
volumes:
- ${KERBERIZED_HDFS_DIR}/../../hdfs_configs/bootstrap.sh:/etc/bootstrap.sh:ro
- ${KERBERIZED_HDFS_DIR}/secrets:/usr/local/hadoop/etc/hadoop/conf
- ${KERBERIZED_HDFS_DIR}/secrets/krb_long.conf:/etc/krb5.conf:ro
- ${KERBERIZED_HDFS_DIR}/../../hdfs_configs/bootstrap.sh:/etc/bootstrap.sh:ro
- ${KERBERIZED_HDFS_DIR}/secrets:/usr/local/hadoop/etc/hadoop/conf
- ${KERBERIZED_HDFS_DIR}/secrets/krb_long.conf:/etc/krb5.conf:ro
- type: ${KERBERIZED_HDFS_FS:-tmpfs}
source: ${KERBERIZED_HDFS_LOGS:-}
target: /var/log/hadoop-hdfs
ports:
- 1006:1006
- 50070:50070
- 9010:9010
- ${KERBERIZED_HDFS_NAME_EXTERNAL_PORT}:${KERBERIZED_HDFS_NAME_INTERNAL_PORT} #50070
- ${KERBERIZED_HDFS_DATA_EXTERNAL_PORT}:${KERBERIZED_HDFS_DATA_INTERNAL_PORT} #1006
# - 9010:9010
depends_on:
- hdfskerberos
entrypoint: /etc/bootstrap.sh -d

View File

@ -184,6 +184,13 @@ class ClickHouseCluster:
self.hdfs_dir = p.abspath(p.join(self.instances_dir, "hdfs"))
self.hdfs_logs_dir = os.path.join(self.hdfs_dir, "logs")
# available when with_kerberized_hdfs == True
self.hdfs_kerberized_host = "kerberizedhdfs1"
self.hdfs_kerberized_name_port = get_open_port()
self.hdfs_kerberized_data_port = get_open_port()
self.hdfs_kerberized_dir = p.abspath(p.join(self.instances_dir, "kerberized_hdfs"))
self.hdfs_kerberized_logs_dir = os.path.join(self.hdfs_kerberized_dir, "logs")
# available when with_kafka == True
self.kafka_host = "kafka1"
self.kafka_port = get_open_port()
@ -276,6 +283,21 @@ class ClickHouseCluster:
print("HDFS BASE CMD:{}".format(self.base_hdfs_cmd))
return self.base_hdfs_cmd
def setup_kerberized_hdfs_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_kerberized_hdfs = True
env_variables['KERBERIZED_HDFS_HOST'] = self.hdfs_kerberized_host
env_variables['KERBERIZED_HDFS_NAME_EXTERNAL_PORT'] = str(self.hdfs_kerberized_name_port)
env_variables['KERBERIZED_HDFS_NAME_INTERNAL_PORT'] = "50070"
env_variables['KERBERIZED_HDFS_DATA_EXTERNAL_PORT'] = str(self.hdfs_kerberized_data_port)
env_variables['KERBERIZED_HDFS_DATA_INTERNAL_PORT'] = "1006"
env_variables['KERBERIZED_HDFS_LOGS'] = self.hdfs_kerberized_logs_dir
env_variables['KERBERIZED_HDFS_FS'] = "bind"
env_variables['KERBERIZED_HDFS_DIR'] = instance.path + '/'
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')])
self.base_kerberized_hdfs_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')]
return self.base_kerberized_hdfs_cmd
def setup_kafka_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_redis = True
env_variables['KAFKA_HOST'] = self.kafka_host
@ -345,7 +367,7 @@ class ClickHouseCluster:
self.with_minio = True
cert_d = p.join(self.minio_dir, "certs")
env_variables['MINIO_CERTS_DIR'] = cert_d
env_variables['MINIO_EXTERNAL_PORT'] = self.minio_port
env_variables['MINIO_EXTERNAL_PORT'] = str(self.minio_port)
env_variables['MINIO_INTERNAL_PORT'] = "9001"
env_variables['SSL_CERT_FILE'] = p.join(self.base_dir, cert_d, 'public.crt')
@ -473,12 +495,7 @@ class ClickHouseCluster:
cmds.append(self.setup_hdfs_cmd(instance, env_variables, docker_compose_yml_dir))
if with_kerberized_hdfs and not self.with_kerberized_hdfs:
self.with_kerberized_hdfs = True
env_variables['KERBERIZED_HDFS_DIR'] = instance.path + '/'
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')])
self.base_kerberized_hdfs_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')]
cmds.append(self.base_kerberized_hdfs_cmd)
cmds.append(self.setup_kerberized_hdfs_cmd(instance, env_variables, docker_compose_yml_dir))
if with_mongo and not self.with_mongo:
cmds.append(self.setup_mongo_cmd(instance, env_variables, docker_compose_yml_dir))
@ -672,10 +689,10 @@ class ClickHouseCluster:
principal="root@TEST.CLICKHOUSE.TECH",
keytab=keytab,
krb_conf=krb_conf,
host="kerberizedhdfs1",
host="localhost",
protocol="http",
proxy_port=50070,
data_port=1006,
proxy_port=self.hdfs_kerberized_name_port,
data_port=self.hdfs_kerberized_data_port,
hdfs_ip=hdfs_ip,
kdc_ip=kdc_ip)
@ -847,9 +864,10 @@ class ClickHouseCluster:
if self.with_kerberized_hdfs and self.base_kerberized_hdfs_cmd:
logging.debug('Setup kerberized HDFS')
os.makedirs(self.hdfs_kerberized_logs_dir)
run_and_check(self.base_kerberized_hdfs_cmd + common_opts)
hdfs_api = self.make_hdfs_api(kerberized=True)
self.wait_hdfs_to_start(hdfs_api, timeout=300)
self.wait_hdfs_to_start(hdfs_api, timeout=30)
if self.with_mongo and self.base_mongo_cmd:
logging.debug('Setup Mongo')

View File

@ -10,7 +10,6 @@ import socket
import tempfile
import logging
import os
class mk_krb_conf(object):
def __init__(self, krb_conf, kdc_ip):
self.krb_conf = krb_conf
@ -51,6 +50,9 @@ class HDFSApi(object):
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
kerb_log = logging.getLogger("requests_kerberos")
kerb_log.setLevel(logging.DEBUG)
kerb_log.propagate = True
if kerberized:
self._run_kinit()
@ -66,23 +68,28 @@ class HDFSApi(object):
raise Exception("kerberos principal and keytab are required")
with mk_krb_conf(self.krb_conf, self.kdc_ip) as instantiated_krb_conf:
logging.debug("instantiated_krb_conf ", instantiated_krb_conf)
logging.debug("instantiated_krb_conf {}".format(instantiated_krb_conf))
os.environ["KRB5_CONFIG"] = instantiated_krb_conf
cmd = "(kinit -R -t {keytab} -k {principal} || (sleep 5 && kinit -R -t {keytab} -k {principal})) ; klist".format(instantiated_krb_conf=instantiated_krb_conf, keytab=self.keytab, principal=self.principal)
logging.debug(cmd)
start = time.time()
while time.time() - start < self.timeout:
try:
subprocess.call(cmd, shell=True)
print("KDC started, kinit successfully run")
res = subprocess.run(cmd, shell=True)
if res.returncode != 0:
# check_call(...) from subprocess does not print stderr, so we do it manually
logging.debug('Stderr:\n{}\n'.format(res.stderr.decode('utf-8')))
logging.debug('Stdout:\n{}\n'.format(res.stdout.decode('utf-8')))
logging.debug('Env:\n{}\n'.format(env))
raise Exception('Command {} return non-zero code {}: {}'.format(args, res.returncode, res.stderr.decode('utf-8')))
logging.debug("KDC started, kinit successfully run")
return
except Exception as ex:
print("Can't run kinit ... waiting {}".format(str(ex)))
logging.debug("Can't run kinit ... waiting {}".format(str(ex)))
time.sleep(1)
raise Exception("Kinit running failure")
@ -93,9 +100,13 @@ class HDFSApi(object):
if response.status_code != 307:
response.raise_for_status()
# additional_params = '&'.join(response.headers['Location'].split('&')[1:2])
url = "{location}".format(location=response.headers['Location'].replace("hdfs1:50075", "{}:{}".format(self.host, self.data_port)))
logging.debug("redirected to {}".format(url))
response_data = requests.get(url, headers={'host': 'localhost'},
location = None
if self.kerberized:
location = response.headers['Location'].replace("kerberizedhdfs1:1006", "{}:{}".format(self.host, self.data_port))
else:
location = response.headers['Location'].replace("hdfs1:50075", "{}:{}".format(self.host, self.data_port))
logging.debug("redirected to {}".format(location))
response_data = requests.get(location, headers={'host': 'localhost'},
verify=False, auth=self.kerberos_auth)
if response_data.status_code != 200:
response_data.raise_for_status()
@ -116,7 +127,6 @@ class HDFSApi(object):
if self.kerberized:
self._run_kinit()
self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal)
logging.debug(self.kerberos_auth)
response = requests.put(
"{protocol}://{host}:{port}/webhdfs/v1{path}?op=CREATE".format(protocol=self.protocol, host='localhost',
@ -135,8 +145,11 @@ class HDFSApi(object):
# additional_params = '&'.join(
# response.headers['Location'].split('&')[1:2] + ["user.name={}".format(self.user), "overwrite=true"])
location = response.headers['Location'].replace("hdfs1:50075", "{}:{}".format(self.host, self.data_port))
if self.kerberized:
location = response.headers['Location'].replace("kerberizedhdfs1:1006", "{}:{}".format(self.host, self.data_port))
else:
location = response.headers['Location'].replace("hdfs1:50075", "{}:{}".format(self.host, self.data_port))
with open(fpath, mode="rb") as fh:
file_data = fh.read()
protocol = "http" # self.protocol

View File

@ -142,12 +142,23 @@ if __name__ == "__main__":
action="append",
help="Set non-default tags for images used in docker compose recipes(yandex/my_container:my_tag)")
parser.add_argument(
"-n", "--parallel",
action="store",
dest="parallel",
help="Parallelism")
parser.add_argument('pytest_args', nargs='*', help="args for pytest command")
args = parser.parse_args()
check_args_and_update_paths(args)
parallel_args = ""
if args.parallel:
parallel_args += "--dist=loadfile"
parallel_args += "-n {}".format(args.parallel)
net = ""
if not args.disable_net_host:
net = "--net=host"
@ -188,7 +199,7 @@ if __name__ == "__main__":
cmd = "docker run {net} {tty} --rm --name {name} --privileged --volume={bridge_bin}:/clickhouse-odbc-bridge --volume={bin}:/clickhouse \
--volume={base_cfg}:/clickhouse-config --volume={cases_dir}:/ClickHouse/tests/integration \
--volume={src_dir}/Server/grpc_protos:/ClickHouse/src/Server/grpc_protos \
--volume={name}_volume:/var/lib/docker {env_tags} -e PYTEST_OPTS='{opts}' {img} {command}".format(
--volume={name}_volume:/var/lib/docker {env_tags} -e PYTEST_OPTS='{parallel} {opts}' {img} {command}".format(
net=net,
tty=tty,
bin=args.binary,
@ -197,6 +208,7 @@ if __name__ == "__main__":
cases_dir=args.cases_dir,
src_dir=args.src_dir,
env_tags=env_tags,
parallel=parallel_args,
opts=' '.join(args.pytest_args),
img=DIND_INTEGRATION_TESTS_IMAGE_NAME + ":" + args.docker_image_version,
name=CONTAINER_NAME,