mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-14 10:22:10 +00:00
156 lines
5.1 KiB
Python
156 lines
5.1 KiB
Python
import time
|
|
import pytest
|
|
|
|
import os
|
|
|
|
from helpers.cluster import ClickHouseCluster, is_arm
|
|
import subprocess
|
|
|
|
if is_arm():
|
|
pytestmark = pytest.mark.skip
|
|
|
|
cluster = ClickHouseCluster(__file__)
|
|
node1 = cluster.add_instance(
|
|
"node1",
|
|
with_kerberized_hdfs=True,
|
|
user_configs=[],
|
|
main_configs=["configs/hdfs.xml"],
|
|
)
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def started_cluster():
|
|
try:
|
|
cluster.start()
|
|
|
|
yield cluster
|
|
|
|
except Exception as ex:
|
|
print(ex)
|
|
raise ex
|
|
finally:
|
|
cluster.shutdown()
|
|
|
|
|
|
def test_read_table(started_cluster):
|
|
hdfs_api = started_cluster.hdfs_api
|
|
|
|
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
|
|
hdfs_api.write_data("/simple_table_function", data)
|
|
|
|
api_read = hdfs_api.read_data("/simple_table_function")
|
|
assert api_read == data
|
|
|
|
select_read = node1.query(
|
|
"select * from hdfs('hdfs://kerberizedhdfs1:9010/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')"
|
|
)
|
|
assert select_read == data
|
|
|
|
|
|
def test_read_write_storage(started_cluster):
|
|
hdfs_api = started_cluster.hdfs_api
|
|
|
|
node1.query(
|
|
"create table SimpleHDFSStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9010/simple_storage1', 'TSV')"
|
|
)
|
|
node1.query("insert into SimpleHDFSStorage2 values (1, 'Mark', 72.53)")
|
|
|
|
api_read = hdfs_api.read_data("/simple_storage1")
|
|
assert api_read == "1\tMark\t72.53\n"
|
|
|
|
select_read = node1.query("select * from SimpleHDFSStorage2")
|
|
assert select_read == "1\tMark\t72.53\n"
|
|
|
|
|
|
def test_write_storage_not_expired(started_cluster):
|
|
hdfs_api = started_cluster.hdfs_api
|
|
|
|
node1.query(
|
|
"create table SimpleHDFSStorageNotExpired (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9010/simple_storage_not_expired', 'TSV')"
|
|
)
|
|
|
|
time.sleep(15) # wait for ticket expiration
|
|
node1.query("insert into SimpleHDFSStorageNotExpired values (1, 'Mark', 72.53)")
|
|
|
|
api_read = hdfs_api.read_data("/simple_storage_not_expired")
|
|
assert api_read == "1\tMark\t72.53\n"
|
|
|
|
select_read = node1.query("select * from SimpleHDFSStorageNotExpired")
|
|
assert select_read == "1\tMark\t72.53\n"
|
|
|
|
|
|
def test_two_users(started_cluster):
|
|
hdfs_api = started_cluster.hdfs_api
|
|
|
|
node1.query(
|
|
"create table HDFSStorOne (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9010/storage_user_one', 'TSV')"
|
|
)
|
|
node1.query("insert into HDFSStorOne values (1, 'Real', 86.00)")
|
|
|
|
node1.query(
|
|
"create table HDFSStorTwo (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9010/user/specuser/storage_user_two', 'TSV')"
|
|
)
|
|
node1.query("insert into HDFSStorTwo values (1, 'Ideal', 74.00)")
|
|
|
|
select_read_1 = node1.query(
|
|
"select * from hdfs('hdfs://kerberizedhdfs1:9010/user/specuser/storage_user_two', 'TSV', 'id UInt64, text String, number Float64')"
|
|
)
|
|
|
|
select_read_2 = node1.query(
|
|
"select * from hdfs('hdfs://suser@kerberizedhdfs1:9010/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')"
|
|
)
|
|
|
|
|
|
def test_read_table_expired(started_cluster):
|
|
hdfs_api = started_cluster.hdfs_api
|
|
|
|
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
|
|
hdfs_api.write_data("/simple_table_function_relogin", data)
|
|
|
|
started_cluster.pause_container("hdfskerberos")
|
|
time.sleep(15)
|
|
|
|
try:
|
|
select_read = node1.query(
|
|
"select * from hdfs('hdfs://reloginuser&kerberizedhdfs1:9010/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')"
|
|
)
|
|
assert False, "Exception have to be thrown"
|
|
except Exception as ex:
|
|
assert "DB::Exception: KerberosInit failure:" in str(ex)
|
|
|
|
started_cluster.unpause_container("hdfskerberos")
|
|
|
|
|
|
def test_prohibited(started_cluster):
|
|
node1.query(
|
|
"create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9010/storage_user_two_prohibited', 'TSV')"
|
|
)
|
|
try:
|
|
node1.query("insert into HDFSStorTwoProhibited values (1, 'SomeOne', 74.00)")
|
|
assert False, "Exception have to be thrown"
|
|
except Exception as ex:
|
|
assert (
|
|
"Unable to open HDFS file: /storage_user_two_prohibited (hdfs://suser@kerberizedhdfs1:9010/storage_user_two_prohibited) error: Permission denied: user=specuser, access=WRITE"
|
|
in str(ex)
|
|
)
|
|
|
|
|
|
def test_cache_path(started_cluster):
|
|
node1.query(
|
|
"create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9010/storage_dedicated_cache_path', 'TSV')"
|
|
)
|
|
try:
|
|
node1.query("insert into HDFSStorCachePath values (1, 'FatMark', 92.53)")
|
|
assert False, "Exception have to be thrown"
|
|
except Exception as ex:
|
|
assert (
|
|
"DB::Exception: hadoop.security.kerberos.ticket.cache.path cannot be set per user"
|
|
in str(ex)
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
cluster.start()
|
|
input("Cluster created, press any key to destroy...")
|
|
cluster.shutdown()
|