This commit is contained in:
Yatsishin Ilya 2021-02-19 15:58:11 +03:00
parent 2df4317aba
commit ee955038c1
7 changed files with 158 additions and 147 deletions

View File

@ -7,7 +7,7 @@ services:
- data1-1:/data1
- ${MINIO_CERTS_DIR:-}:/certs
ports:
- "9001:9001"
- ${MINIO_EXTERNAL_PORT}:${MINIO_INTERNAL_PORT}
environment:
MINIO_ACCESS_KEY: minio
MINIO_SECRET_KEY: minio123

View File

@ -36,7 +36,7 @@ DEFAULT_ENV_NAME = '.env'
SANITIZER_SIGN = "=================="
# to create docker-compose env file
def _create_env_file(path, variables):
logging.debug("Env {} stored in {}".format(variables, path))
with open(path, 'w') as f:
@ -44,13 +44,6 @@ def _create_env_file(path, variables):
f.write("=".join([var, value]) + "\n")
return path
def env_to_compose_args(env):
args = []
for key, value in env.items():
args += ["-e", "{}={}".format(key, value)]
return args
def run_and_check(args, env=None, shell=False):
res = subprocess.run(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, env=env, shell=shell)
if res.returncode != 0:
@ -174,7 +167,8 @@ class ClickHouseCluster:
self.with_cassandra = False
self.with_minio = False
self.minio_certs_dir = None
self.minio_dir = os.path.join(self.instances_dir, "minio")
self.minio_certs_dir = None # source for certificates
self.minio_host = "minio1"
self.minio_bucket = "root"
self.minio_bucket_2 = "root2"
@ -347,6 +341,19 @@ class ClickHouseCluster:
'--file', p.join(docker_compose_yml_dir, 'docker_compose_mongo.yml')]
return self.base_mongo_cmd
def setup_minio_cmd(self, instance, env_variables, docker_compose_yml_dir):
self.with_minio = True
cert_d = p.join(self.minio_dir, "certs")
env_variables['MINIO_CERTS_DIR'] = cert_d
env_variables['MINIO_EXTERNAL_PORT'] = self.minio_port
env_variables['MINIO_INTERNAL_PORT'] = "9001"
env_variables['SSL_CERT_FILE'] = p.join(self.base_dir, cert_d, 'public.crt')
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_minio.yml')])
self.base_minio_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_minio.yml')]
return self.base_minio_cmd
def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None,
macros=None,
with_zookeeper=False, with_mysql=False, with_mysql8=False, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False,
@ -484,21 +491,13 @@ class ClickHouseCluster:
cmds.append(self.setup_redis_cmd(instance, env_variables, docker_compose_yml_dir))
if with_minio and not self.with_minio:
self.with_minio = True
self.minio_certs_dir = minio_certs_dir
if self.minio_certs_dir:
env_variables['MINIO_CERTS_DIR'] = p.join(self.base_dir, self.minio_certs_dir)
# Minio client (urllib3) uses SSL_CERT_FILE for certificate validation.
env_variables['SSL_CERT_FILE'] = p.join(self.base_dir, self.minio_certs_dir, 'public.crt')
cmds.append(self.setup_minio_cmd(instance, env_variables, docker_compose_yml_dir))
if minio_certs_dir is not None:
if self.minio_certs_dir is None:
self.minio_certs_dir = minio_certs_dir
else:
# Attach empty certificates directory to ensure non-secure mode.
minio_certs_dir = p.join(self.instances_dir, 'empty_minio_certs_dir')
os.makedirs(minio_certs_dir, exist_ok=True)
env_variables['MINIO_CERTS_DIR'] = minio_certs_dir
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_minio.yml')])
self.base_minio_cmd = ['docker-compose', '--env-file', instance.env_file, '--project-name', self.project_name,
'--file', p.join(docker_compose_yml_dir, 'docker_compose_minio.yml')]
cmds.append(self.base_minio_cmd)
raise Exception("Overwriting minio certs dir")
if with_cassandra and not self.with_cassandra:
self.with_cassandra = True
@ -659,6 +658,7 @@ class ClickHouseCluster:
raise Exception("Cannot wait ZooKeeper container")
def make_hdfs_api(self, timeout=60, kerberized=False):
hdfs_api = None
if kerberized:
keytab = p.abspath(p.join(self.instances['node1'].path, "secrets/clickhouse.keytab"))
krb_conf = p.abspath(p.join(self.instances['node1'].path, "secrets/krb_long.conf"))
@ -666,27 +666,29 @@ class ClickHouseCluster:
# logging.debug("kerberizedhdfs1 ip ", hdfs_ip)
kdc_ip = self.get_instance_ip('hdfskerberos')
# logging.debug("kdc_ip ", kdc_ip)
self.hdfs_api = HDFSApi(user="root",
timeout=timeout,
kerberized=True,
principal="root@TEST.CLICKHOUSE.TECH",
keytab=keytab,
krb_conf=krb_conf,
host="kerberizedhdfs1",
protocol="http",
proxy_port=50070,
data_port=1006,
hdfs_ip=hdfs_ip,
kdc_ip=kdc_ip)
hdfs_api = HDFSApi(user="root",
timeout=timeout,
kerberized=True,
principal="root@TEST.CLICKHOUSE.TECH",
keytab=keytab,
krb_conf=krb_conf,
host="kerberizedhdfs1",
protocol="http",
proxy_port=50070,
data_port=1006,
hdfs_ip=hdfs_ip,
kdc_ip=kdc_ip)
else:
self.hdfs_api = HDFSApi(user="root", host=self.hdfs_host)
logging.debug("Create HDFSApi host={}".format("localhost"))
hdfs_api = HDFSApi(user="root", host="localhost", data_port=self.hdfs_data_port, proxy_port=self.hdfs_name_port)
return hdfs_api
def wait_hdfs_to_start(self, timeout=60):
def wait_hdfs_to_start(self, hdfs_api, timeout=60):
start = time.time()
while time.time() - start < timeout:
try:
self.hdfs_api.write_data("/somefilewithrandomname222", "1")
hdfs_api.write_data("/somefilewithrandomname222", "1")
logging.debug("Connected to HDFS and SafeMode disabled! ")
return
except Exception as ex:
@ -710,7 +712,7 @@ class ClickHouseCluster:
time.sleep(1)
def wait_minio_to_start(self, timeout=30, secure=False):
minio_client = Minio('localhost:9001',
minio_client = Minio('localhost:{}'.format(self.minio_port),
access_key='minio',
secret_key='minio123',
secure=secure)
@ -840,14 +842,14 @@ class ClickHouseCluster:
logging.debug('Setup HDFS')
os.makedirs(self.hdfs_logs_dir)
subprocess_check_call(self.base_hdfs_cmd + common_opts)
self.make_hdfs_api()
self.wait_hdfs_to_start(50)
hdfs_api = self.make_hdfs_api()
self.wait_hdfs_to_start(hdfs_api, 120)
if self.with_kerberized_hdfs and self.base_kerberized_hdfs_cmd:
logging.debug('Setup kerberized HDFS')
run_and_check(self.base_kerberized_hdfs_cmd + common_opts)
self.make_hdfs_api(kerberized=True)
self.wait_hdfs_to_start(timeout=300)
hdfs_api = self.make_hdfs_api(kerberized=True)
self.wait_hdfs_to_start(hdfs_api, timeout=300)
if self.with_mongo and self.base_mongo_cmd:
logging.debug('Setup Mongo')
@ -860,6 +862,13 @@ class ClickHouseCluster:
time.sleep(10)
if self.with_minio and self.base_minio_cmd:
# Copy minio certificates to minio/certs
os.mkdir(self.minio_dir)
if self.minio_certs_dir is None:
os.mkdir(os.path.join(self.minio_dir, 'certs'))
else:
shutil.copytree(self.minio_certs_dir, os.path.join(self.minio_dir, 'certs'))
minio_start_cmd = self.base_minio_cmd + common_opts
logging.info("Trying to create Minio instance by command %s", ' '.join(map(str, minio_start_cmd)))

View File

@ -11,15 +11,6 @@ import tempfile
import logging
import os
g_dns_hook = None
def custom_getaddrinfo(*args):
# print("from custom_getaddrinfo g_dns_hook is None ", g_dns_hook is None)
ret = g_dns_hook.custom_getaddrinfo(*args)
# print("g_dns_hook.custom_getaddrinfo result", ret)
return ret
class mk_krb_conf(object):
def __init__(self, krb_conf, kdc_ip):
self.krb_conf = krb_conf
@ -37,32 +28,6 @@ class mk_krb_conf(object):
if self.amended_krb_conf is not None:
self.amended_krb_conf.close()
# tweak dns resolution to connect to localhost where api_host is in URL
class dns_hook(object):
def __init__(self, hdfs_api):
# print("dns_hook.init ", hdfs_api.kerberized, hdfs_api.host, hdfs_api.data_port, hdfs_api.proxy_port)
self.hdfs_api = hdfs_api
def __enter__(self):
global g_dns_hook
g_dns_hook = self
# print("g_dns_hook is None ", g_dns_hook is None)
self.original_getaddrinfo = socket.getaddrinfo
socket.getaddrinfo = custom_getaddrinfo
return self
def __exit__(self, type, value, traceback):
global g_dns_hook
g_dns_hook = None
socket.getaddrinfo = self.original_getaddrinfo
def custom_getaddrinfo(self, *args):
(hostname, port) = args[:2]
# print("top of custom_getaddrinfo", hostname, port)
if hostname == self.hdfs_api.host and (port == self.hdfs_api.data_port or port == self.hdfs_api.proxy_port):
# print("dns_hook substitute")
return [(socket.AF_INET, 1, 6, '', ("127.0.0.1", port))]
else:
return self.original_getaddrinfo(*args)
class HDFSApi(object):
def __init__(self, user, timeout=100, kerberized=False, principal=None,
keytab=None, krb_conf=None,
@ -83,9 +48,9 @@ class HDFSApi(object):
# logging.basicConfig(level=logging.DEBUG)
# logging.getLogger().setLevel(logging.DEBUG)
# requests_log = logging.getLogger("requests.packages.urllib3")
# requests_log.setLevel(logging.DEBUG)
# requests_log.propagate = True
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
if kerberized:
self._run_kinit()
@ -101,13 +66,13 @@ class HDFSApi(object):
raise Exception("kerberos principal and keytab are required")
with mk_krb_conf(self.krb_conf, self.kdc_ip) as instantiated_krb_conf:
# print("instantiated_krb_conf ", instantiated_krb_conf)
logging.debug("instantiated_krb_conf ", instantiated_krb_conf)
os.environ["KRB5_CONFIG"] = instantiated_krb_conf
cmd = "(kinit -R -t {keytab} -k {principal} || (sleep 5 && kinit -R -t {keytab} -k {principal})) ; klist".format(instantiated_krb_conf=instantiated_krb_conf, keytab=self.keytab, principal=self.principal)
# print(cmd)
logging.debug(cmd)
start = time.time()
@ -123,17 +88,15 @@ class HDFSApi(object):
raise Exception("Kinit running failure")
def read_data(self, path, universal_newlines=True):
with dns_hook(self):
response = requests.get("{protocol}://{host}:{port}/webhdfs/v1{path}?op=OPEN".format(protocol=self.protocol, host=self.host, port=self.proxy_port, path=path), headers={'host': 'localhost'}, allow_redirects=False, verify=False, auth=self.kerberos_auth)
logging.debug("read_data protocol:{} host:{} port:{} path: {}".format(self.protocol, self.host, self.proxy_port, path))
response = requests.get("{protocol}://{host}:{port}/webhdfs/v1{path}?op=OPEN".format(protocol=self.protocol, host=self.host, port=self.proxy_port, path=path), headers={'host': 'localhost'}, allow_redirects=False, verify=False, auth=self.kerberos_auth)
if response.status_code != 307:
response.raise_for_status()
# additional_params = '&'.join(response.headers['Location'].split('&')[1:2])
url = "{location}".format(location=response.headers['Location'])
# print("redirected to ", url)
with dns_hook(self):
response_data = requests.get(url,
headers={'host': 'localhost'},
verify=False, auth=self.kerberos_auth)
url = "{location}".format(location=response.headers['Location'].replace("hdfs1:50075", "{}:{}".format(self.host, self.data_port)))
logging.debug("redirected to {}".format(url))
response_data = requests.get(url, headers={'host': 'localhost'},
verify=False, auth=self.kerberos_auth)
if response_data.status_code != 200:
response_data.raise_for_status()
if universal_newlines:
@ -142,6 +105,7 @@ class HDFSApi(object):
return response_data.content
def write_data(self, path, content):
logging.debug("write_data protocol:{} host:{} port:{} path: {} user:{}".format(self.protocol, self.host, self.proxy_port, path, self.user))
named_file = NamedTemporaryFile(mode='wb+')
fpath = named_file.name
if isinstance(content, str):
@ -149,40 +113,41 @@ class HDFSApi(object):
named_file.write(content)
named_file.flush()
if self.kerberized:
self._run_kinit()
self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal)
# print(self.kerberos_auth)
logging.debug(self.kerberos_auth)
response = requests.put(
"{protocol}://{host}:{port}/webhdfs/v1{path}?op=CREATE".format(protocol=self.protocol, host='localhost',
port=self.proxy_port,
path=path, user=self.user),
allow_redirects=False,
headers={'host': 'localhost'},
params={'overwrite' : 'true'},
verify=False, auth=self.kerberos_auth
)
logging.debug("HDFS api response:{}".format(response.headers))
with dns_hook(self):
response = requests.put(
"{protocol}://{host}:{port}/webhdfs/v1{path}?op=CREATE".format(protocol=self.protocol, host=self.host,
port=self.proxy_port,
path=path, user=self.user),
allow_redirects=False,
headers={'host': 'localhost'},
params={'overwrite' : 'true'},
verify=False, auth=self.kerberos_auth
)
if response.status_code != 307:
# print(response.headers)
response.raise_for_status()
additional_params = '&'.join(
response.headers['Location'].split('&')[1:2] + ["user.name={}".format(self.user), "overwrite=true"])
# additional_params = '&'.join(
# response.headers['Location'].split('&')[1:2] + ["user.name={}".format(self.user), "overwrite=true"])
location = response.headers['Location'].replace("hdfs1:50075", "{}:{}".format(self.host, self.data_port))
with dns_hook(self), open(fpath, mode="rb") as fh:
with open(fpath, mode="rb") as fh:
file_data = fh.read()
protocol = "http" # self.protocol
response = requests.put(
"{location}".format(location=response.headers['Location']),
"{location}".format(location=location),
data=file_data,
headers={'content-type':'text/plain', 'host': 'localhost'},
params={'file': path, 'user.name' : self.user},
allow_redirects=False, verify=False, auth=self.kerberos_auth
)
# print(response)
logging.debug(response)
if response.status_code != 201:
response.raise_for_status()

View File

@ -100,8 +100,10 @@ def test_table_function_remote(start_cluster):
def test_redirect(start_cluster):
start_cluster.hdfs_api.write_data("/simple_storage", "1\t\n")
assert start_cluster.hdfs_api.read_data("/simple_storage") == "1\t\n"
hdfs_api = start_cluster.make_hdfs_api()
hdfs_api.write_data("/simple_storage", "1\t\n")
assert hdfs_api.read_data("/simple_storage") == "1\t\n"
node7.query(
"CREATE TABLE table_test_7_1 (word String) ENGINE=URL('http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', CSV)")
assert "not allowed" in node7.query_and_get_error("SET max_http_get_redirects=1; SELECT * from table_test_7_1")

View File

@ -17,8 +17,10 @@ def started_cluster():
def test_url_without_redirect(started_cluster):
started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
hdfs_api = started_cluster.make_hdfs_api()
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access datanode port directly
node1.query(
@ -27,8 +29,10 @@ def test_url_without_redirect(started_cluster):
def test_url_with_redirect_not_allowed(started_cluster):
started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
hdfs_api = started_cluster.make_hdfs_api()
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access proxy port without allowing redirects
node1.query(
@ -38,8 +42,10 @@ def test_url_with_redirect_not_allowed(started_cluster):
def test_url_with_redirect_allowed(started_cluster):
started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
hdfs_api = started_cluster.make_hdfs_api()
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access proxy port with allowing redirects
# http://localhost:50070/webhdfs/v1/b?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0

View File

@ -18,14 +18,16 @@ def started_cluster():
def test_read_write_storage(started_cluster):
hdfs_api = started_cluster.make_hdfs_api()
node1.query(
"create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/simple_storage', 'TSV')")
node1.query("insert into SimpleHDFSStorage values (1, 'Mark', 72.53)")
assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from SimpleHDFSStorage") == "1\tMark\t72.53\n"
def test_read_write_storage_with_globs(started_cluster):
hdfs_api = started_cluster.make_hdfs_api()
node1.query(
"create table HDFSStorageWithRange (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage{1..5}', 'TSV')")
node1.query(
@ -36,8 +38,8 @@ def test_read_write_storage_with_globs(started_cluster):
"create table HDFSStorageWithAsterisk (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage*', 'TSV')")
for i in ["1", "2", "3"]:
started_cluster.hdfs_api.write_data("/storage" + i, i + "\tMark\t72.53\n")
assert started_cluster.hdfs_api.read_data("/storage" + i) == i + "\tMark\t72.53\n"
hdfs_api.write_data("/storage" + i, i + "\tMark\t72.53\n")
assert hdfs_api.read_data("/storage" + i) == i + "\tMark\t72.53\n"
assert node1.query("select count(*) from HDFSStorageWithRange") == "3\n"
assert node1.query("select count(*) from HDFSStorageWithEnum") == "3\n"
@ -67,23 +69,26 @@ def test_read_write_storage_with_globs(started_cluster):
def test_read_write_table(started_cluster):
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
started_cluster.hdfs_api.write_data("/simple_table_function", data)
hdfs_api = started_cluster.make_hdfs_api()
assert started_cluster.hdfs_api.read_data("/simple_table_function") == data
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
hdfs_api.write_data("/simple_table_function", data)
assert hdfs_api.read_data("/simple_table_function") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')") == data
def test_write_table(started_cluster):
hdfs_api = started_cluster.make_hdfs_api()
node1.query(
"create table OtherHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/other_storage', 'TSV')")
node1.query("insert into OtherHDFSStorage values (10, 'tomas', 55.55), (11, 'jack', 32.54)")
result = "10\ttomas\t55.55\n11\tjack\t32.54\n"
assert started_cluster.hdfs_api.read_data("/other_storage") == result
assert hdfs_api.read_data("/other_storage") == result
assert node1.query("select * from OtherHDFSStorage order by id") == result
@ -110,12 +115,14 @@ def test_bad_hdfs_uri(started_cluster):
@pytest.mark.timeout(800)
def test_globs_in_read_table(started_cluster):
hdfs_api = started_cluster.make_hdfs_api()
some_data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
globs_dir = "/dir_for_test_with_globs/"
files = ["dir1/dir_dir/file1", "dir2/file2", "simple_table_function", "dir/file", "some_dir/dir1/file",
"some_dir/dir2/file", "some_dir/file", "table1_function", "table2_function", "table3_function"]
for filename in files:
started_cluster.hdfs_api.write_data(globs_dir + filename, some_data)
hdfs_api.write_data(globs_dir + filename, some_data)
test_requests = [("dir{1..5}/dir_dir/file1", 1, 1),
("*_table_functio?", 1, 1),
@ -141,58 +148,70 @@ def test_globs_in_read_table(started_cluster):
def test_read_write_gzip_table(started_cluster):
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
started_cluster.hdfs_api.write_gzip_data("/simple_table_function.gz", data)
hdfs_api = started_cluster.make_hdfs_api()
assert started_cluster.hdfs_api.read_gzip_data("/simple_table_function.gz") == data
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function.gz", data)
assert hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64')") == data
def test_read_write_gzip_table_with_parameter_gzip(started_cluster):
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
started_cluster.hdfs_api.write_gzip_data("/simple_table_function", data)
hdfs_api = started_cluster.make_hdfs_api()
assert started_cluster.hdfs_api.read_gzip_data("/simple_table_function") == data
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function", data)
assert hdfs_api.read_gzip_data("/simple_table_function") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64', 'gzip')") == data
def test_read_write_table_with_parameter_none(started_cluster):
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
started_cluster.hdfs_api.write_data("/simple_table_function.gz", data)
hdfs_api = started_cluster.make_hdfs_api()
assert started_cluster.hdfs_api.read_data("/simple_table_function.gz") == data
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_data("/simple_table_function.gz", data)
assert hdfs_api.read_data("/simple_table_function.gz") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64', 'none')") == data
def test_read_write_gzip_table_with_parameter_auto_gz(started_cluster):
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
started_cluster.hdfs_api.write_gzip_data("/simple_table_function.gz", data)
hdfs_api = started_cluster.make_hdfs_api()
assert started_cluster.hdfs_api.read_gzip_data("/simple_table_function.gz") == data
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function.gz", data)
assert hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64', 'auto')") == data
def test_write_gz_storage(started_cluster):
hdfs_api = started_cluster.make_hdfs_api()
node1.query(
"create table GZHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage.gz', 'TSV')")
node1.query("insert into GZHDFSStorage values (1, 'Mark', 72.53)")
assert started_cluster.hdfs_api.read_gzip_data("/storage.gz") == "1\tMark\t72.53\n"
assert hdfs_api.read_gzip_data("/storage.gz") == "1\tMark\t72.53\n"
assert node1.query("select * from GZHDFSStorage") == "1\tMark\t72.53\n"
def test_write_gzip_storage(started_cluster):
hdfs_api = started_cluster.make_hdfs_api()
node1.query(
"create table GZIPHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/gzip_storage', 'TSV', 'gzip')")
node1.query("insert into GZIPHDFSStorage values (1, 'Mark', 72.53)")
assert started_cluster.hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n"
assert hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from GZIPHDFSStorage") == "1\tMark\t72.53\n"
if __name__ == '__main__':

View File

@ -23,10 +23,12 @@ def started_cluster():
cluster.shutdown()
def test_read_table(started_cluster):
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
started_cluster.hdfs_api.write_data("/simple_table_function", data)
hdfs_api = started_cluster.make_hdfs_api()
api_read = started_cluster.hdfs_api.read_data("/simple_table_function")
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
hdfs_api.write_data("/simple_table_function", data)
api_read = hdfs_api.read_data("/simple_table_function")
assert api_read == data
select_read = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9010/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')")
@ -34,10 +36,12 @@ def test_read_table(started_cluster):
def test_read_write_storage(started_cluster):
hdfs_api = started_cluster.make_hdfs_api()
node1.query("create table SimpleHDFSStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9010/simple_storage1', 'TSV')")
node1.query("insert into SimpleHDFSStorage2 values (1, 'Mark', 72.53)")
api_read = started_cluster.hdfs_api.read_data("/simple_storage1")
api_read = hdfs_api.read_data("/simple_storage1")
assert api_read == "1\tMark\t72.53\n"
select_read = node1.query("select * from SimpleHDFSStorage2")
@ -45,12 +49,14 @@ def test_read_write_storage(started_cluster):
def test_write_storage_not_expired(started_cluster):
hdfs_api = started_cluster.make_hdfs_api()
node1.query("create table SimpleHDFSStorageNotExpired (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9010/simple_storage_not_expired', 'TSV')")
time.sleep(45) # wait for ticket expiration
node1.query("insert into SimpleHDFSStorageNotExpired values (1, 'Mark', 72.53)")
api_read = started_cluster.hdfs_api.read_data("/simple_storage_not_expired")
api_read = hdfs_api.read_data("/simple_storage_not_expired")
assert api_read == "1\tMark\t72.53\n"
select_read = node1.query("select * from SimpleHDFSStorageNotExpired")
@ -58,6 +64,8 @@ def test_write_storage_not_expired(started_cluster):
def test_two_users(started_cluster):
hdfs_api = started_cluster.make_hdfs_api()
node1.query("create table HDFSStorOne (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9010/storage_user_one', 'TSV')")
node1.query("insert into HDFSStorOne values (1, 'Real', 86.00)")
@ -69,8 +77,10 @@ def test_two_users(started_cluster):
select_read_2 = node1.query("select * from hdfs('hdfs://suser@kerberizedhdfs1:9010/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')")
def test_read_table_expired(started_cluster):
hdfs_api = started_cluster.make_hdfs_api()
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
started_cluster.hdfs_api.write_data("/simple_table_function_relogin", data)
hdfs_api.write_data("/simple_table_function_relogin", data)
started_cluster.pause_container('hdfskerberos')
time.sleep(45)