Fix kerberized hdfs and cleanup

This commit is contained in:
Yatsishin Ilya 2021-06-09 16:53:16 +03:00
parent ea3090f9c8
commit dd3fd87e26
5 changed files with 31 additions and 41 deletions

View File

@ -1,6 +1,8 @@
import subprocess import subprocess
from helpers.cluster import run_and_check
import pytest import pytest
import logging import logging
import os
from helpers.test_tools import TSV from helpers.test_tools import TSV
from helpers.network import _NetworkManager from helpers.network import _NetworkManager
@ -8,25 +10,22 @@ from helpers.network import _NetworkManager
def cleanup_environment(): def cleanup_environment():
_NetworkManager.clean_all_user_iptables_rules() _NetworkManager.clean_all_user_iptables_rules()
try: try:
result = subprocess.run(['docker', 'container', 'list', '-a', '|', 'wc', '-l']) result = run_and_check(['docker ps | wc -l'], shell=True)
if result.returncode != 0: if int(result) > 1:
logging.error(f"docker ps returned error:{str(result.stderr)}") if int(os.environ.get("PYTEST_CLEANUP_CONTAINERS")) != 1:
else: logging.warning(f"Docker containters({int(result)}) are running before tests run. They can be left from previous pytest run and cause test failures.\n"\
if int(result.stdout) > 1: "You can set env PYTEST_CLEANUP_CONTAINERS=1 or use runner with --cleanup-containers argument to enable automatic containers cleanup.")
if env["PYTEST_CLEANUP_CONTAINERS"] != 1:
logging.warning(f"Docker containters({result.stdout}) are running before tests run. They can be left from previous pytest run and cause test failures.\n"\
"You can set env PYTEST_CLEANUP_CONTAINERS=1 or use runner with --cleanup-containers argument to enable automatic containers cleanup.")
else:
logging.debug("Trying to kill unstopped containers...")
subprocess.run(['docker', 'kill', f'`docker container list -a`'])
subprocess.run(['docker', 'rm', f'`docker container list -a`'])
logging.debug("Unstopped containers killed")
r = subprocess.run(['docker-compose', 'ps', '--services', '--all'])
logging.debug(f"Docker ps before start:{r.stdout}")
else: else:
logging.debug(f"No running containers") logging.debug("Trying to kill unstopped containers...")
run_and_check([f'docker kill $(docker container list --all --quiet)'], shell=True, nothrow=True)
run_and_check([f'docker rm $docker container list --all --quiet)'], shell=True, nothrow=True)
logging.debug("Unstopped containers killed")
r = run_and_check(['docker-compose', 'ps', '--services', '--all'])
logging.debug(f"Docker ps before start:{r.stdout}")
else:
logging.debug(f"No running containers")
except Exception as e: except Exception as e:
logging.error(f"cleanup_environment:{str(e)}") logging.exception(f"cleanup_environment:{str(e)}")
pass pass
yield yield

View File

@ -62,12 +62,14 @@ def run_and_check(args, env=None, shell=False, stdout=subprocess.PIPE, stderr=su
err = res.stderr.decode('utf-8') err = res.stderr.decode('utf-8')
if res.returncode != 0: if res.returncode != 0:
# check_call(...) from subprocess does not print stderr, so we do it manually # check_call(...) from subprocess does not print stderr, so we do it manually
logging.debug(f"Command:{args}")
logging.debug(f"Stderr:{err}") logging.debug(f"Stderr:{err}")
logging.debug(f"Stdout:{out}") logging.debug(f"Stdout:{out}")
logging.debug(f"Env: {env}") logging.debug(f"Env: {env}")
if not nothrow: if not nothrow:
raise Exception(f"Command {args} return non-zero code {res.returncode}: {res.stderr.decode('utf-8')}") raise Exception(f"Command {args} return non-zero code {res.returncode}: {res.stderr.decode('utf-8')}")
else: else:
logging.debug(f"Command:{args}")
logging.debug(f"Stderr: {err}") logging.debug(f"Stderr: {err}")
logging.debug(f"Stdout: {out}") logging.debug(f"Stdout: {out}")
return out return out
@ -375,11 +377,11 @@ class ClickHouseCluster:
def cleanup(self): def cleanup(self):
# Just in case kill unstopped containers from previous launch # Just in case kill unstopped containers from previous launch
try: try:
result = run_and_check(['docker', 'container', 'list', '-a', '-f name={self.project_name}', '|', 'wc', '-l']) result = run_and_check(f'docker container list --all --filter name={self.project_name} | wc -l', shell=True)
if int(result) > 1: if int(result) > 1:
logging.debug("Trying to kill unstopped containers...") logging.debug(f"Trying to kill unstopped containers for project{self.project_name}...")
run_and_check(['docker', 'kill', f'`docker container list -a -f name={self.project_name}`']) run_and_check(f'docker kill $(docker container list --all --quiet --filter name={self.project_name})', shell=True)
run_and_check(['docker', 'rm', f'`docker container list -a -f name={self.project_name}`']) run_and_check(f'docker rm $(docker container list --all --quiet --filter name={self.project_name})', shell=True)
logging.debug("Unstopped containers killed") logging.debug("Unstopped containers killed")
run_and_check(['docker-compose', 'ps', '--services', '--all']) run_and_check(['docker-compose', 'ps', '--services', '--all'])
else: else:
@ -409,8 +411,10 @@ class ClickHouseCluster:
try: try:
logging.debug("Trying to prune unused volumes...") logging.debug("Trying to prune unused volumes...")
run_and_check(['docker', 'volume', 'prune', '-f']) result = run_and_check(['docker volume ls | wc -l'], shell=True)
logging.debug("Volumes pruned") if int(result>0):
run_and_check(['docker', 'volume', 'prune', '-f'])
logging.debug(f"Volumes pruned: {result}")
except: except:
pass pass

View File

@ -106,12 +106,12 @@ class HDFSApi(object):
def read_data(self, path, universal_newlines=True): def read_data(self, path, universal_newlines=True):
logging.debug("read_data protocol:{} host:{} proxy port:{} data port:{} path: {}".format(self.protocol, self.host, self.proxy_port, self.data_port, path)) logging.debug("read_data protocol:{} host:{} ip:{} proxy port:{} data port:{} path: {}".format(self.protocol, self.host, self.hdfs_ip, self.proxy_port, self.data_port, path))
response = self.req_wrapper(requests.get, 307, url="{protocol}://{host}:{port}/webhdfs/v1{path}?op=OPEN".format(protocol=self.protocol, host=self.host, port=self.proxy_port, path=path), headers={'host': str(self.hdfs_ip)}, allow_redirects=False, verify=False, auth=self.kerberos_auth) response = self.req_wrapper(requests.get, 307, url="{protocol}://{ip}:{port}/webhdfs/v1{path}?op=OPEN".format(protocol=self.protocol, ip=self.hdfs_ip, port=self.proxy_port, path=path), headers={'host': str(self.hdfs_ip)}, allow_redirects=False, verify=False, auth=self.kerberos_auth)
# additional_params = '&'.join(response.headers['Location'].split('&')[1:2]) # additional_params = '&'.join(response.headers['Location'].split('&')[1:2])
location = None location = None
if self.kerberized: if self.kerberized:
location = response.headers['Location'].replace("kerberizedhdfs1:9010", "{}:{}".format(self.hdfs_ip, self.proxy_port)) location = response.headers['Location'].replace("kerberizedhdfs1:1006", "{}:{}".format(self.hdfs_ip, self.data_port))
else: else:
location = response.headers['Location'].replace("hdfs1:50075", "{}:{}".format(self.hdfs_ip, self.data_port)) location = response.headers['Location'].replace("hdfs1:50075", "{}:{}".format(self.hdfs_ip, self.data_port))
logging.debug("redirected to {}".format(location)) logging.debug("redirected to {}".format(location))

View File

@ -144,7 +144,7 @@ class _NetworkManager:
res = subprocess.run("iptables -D DOCKER-USER 1", shell=True) res = subprocess.run("iptables -D DOCKER-USER 1", shell=True)
if res.returncode != 0: if res.returncode != 0:
logging.info("All iptables rules cleared, " + str(iptables_iter) + "iterations, last error: " + str(res.stderr)) logging.info("All iptables rules cleared, " + str(iptables_iter) + " iterations, last error: " + str(res.stderr))
return return
@staticmethod @staticmethod

View File

@ -22,8 +22,7 @@ def started_cluster():
finally: finally:
cluster.shutdown() cluster.shutdown()
# TODO Remove it and enable test
@pytest.mark.skip(reason="Don't work in parallel mode for some reason")
def test_read_table(started_cluster): def test_read_table(started_cluster):
hdfs_api = started_cluster.hdfs_api hdfs_api = started_cluster.hdfs_api
@ -36,8 +35,6 @@ def test_read_table(started_cluster):
select_read = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9010/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')") select_read = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9010/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')")
assert select_read == data assert select_read == data
# TODO Remove it and enable test
@pytest.mark.skip(reason="Don't work in parallel mode for some reason")
def test_read_write_storage(started_cluster): def test_read_write_storage(started_cluster):
hdfs_api = started_cluster.hdfs_api hdfs_api = started_cluster.hdfs_api
@ -50,8 +47,6 @@ def test_read_write_storage(started_cluster):
select_read = node1.query("select * from SimpleHDFSStorage2") select_read = node1.query("select * from SimpleHDFSStorage2")
assert select_read == "1\tMark\t72.53\n" assert select_read == "1\tMark\t72.53\n"
# TODO Remove it and enable test
@pytest.mark.skip(reason="Don't work in parallel mode for some reason")
def test_write_storage_not_expired(started_cluster): def test_write_storage_not_expired(started_cluster):
hdfs_api = started_cluster.hdfs_api hdfs_api = started_cluster.hdfs_api
@ -66,8 +61,6 @@ def test_write_storage_not_expired(started_cluster):
select_read = node1.query("select * from SimpleHDFSStorageNotExpired") select_read = node1.query("select * from SimpleHDFSStorageNotExpired")
assert select_read == "1\tMark\t72.53\n" assert select_read == "1\tMark\t72.53\n"
# TODO Remove it and enable test
@pytest.mark.skip(reason="Don't work in parallel mode for some reason")
def test_two_users(started_cluster): def test_two_users(started_cluster):
hdfs_api = started_cluster.hdfs_api hdfs_api = started_cluster.hdfs_api
@ -81,8 +74,6 @@ def test_two_users(started_cluster):
select_read_2 = node1.query("select * from hdfs('hdfs://suser@kerberizedhdfs1:9010/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')") select_read_2 = node1.query("select * from hdfs('hdfs://suser@kerberizedhdfs1:9010/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')")
# TODO Remove it and enable test
@pytest.mark.skip(reason="Don't work in parallel mode for some reason")
def test_read_table_expired(started_cluster): def test_read_table_expired(started_cluster):
hdfs_api = started_cluster.hdfs_api hdfs_api = started_cluster.hdfs_api
@ -100,8 +91,6 @@ def test_read_table_expired(started_cluster):
started_cluster.unpause_container('hdfskerberos') started_cluster.unpause_container('hdfskerberos')
# TODO Remove it and enable test
@pytest.mark.skip(reason="Don't work in parallel mode for some reason")
def test_prohibited(started_cluster): def test_prohibited(started_cluster):
node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9010/storage_user_two_prohibited', 'TSV')") node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9010/storage_user_two_prohibited', 'TSV')")
try: try:
@ -110,8 +99,6 @@ def test_prohibited(started_cluster):
except Exception as ex: except Exception as ex:
assert "Unable to open HDFS file: /storage_user_two_prohibited error: Permission denied: user=specuser, access=WRITE" in str(ex) assert "Unable to open HDFS file: /storage_user_two_prohibited error: Permission denied: user=specuser, access=WRITE" in str(ex)
# TODO Remove it and enable test
@pytest.mark.skip(reason="Don't work in parallel mode for some reason")
def test_cache_path(started_cluster): def test_cache_path(started_cluster):
node1.query("create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9010/storage_dedicated_cache_path', 'TSV')") node1.query("create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9010/storage_dedicated_cache_path', 'TSV')")
try: try: