HADOOP_SECURE_DN_USER way, kinit thread, docker capabilities

This commit is contained in:
Ilya Golshtein 2020-09-28 20:20:04 +03:00
parent eb10948ca2
commit d9166a0645
19 changed files with 597 additions and 163 deletions

View File

@ -2,6 +2,7 @@ version: '2.3'
services:
hdfs1:
image: sequenceiq/hadoop-docker:2.7.0
hostname: hdfs1
restart: always
ports:
- 50075:50075

View File

@ -2,7 +2,9 @@ version: '2.3'
services:
kerberizedhdfs1:
image: sequenceiq/hadoop-docker:2.7.0
cap_add:
- CAP_DAC_READ_SEARCH
image: ilejn/kerberized-hadoop:latest
hostname: kerberizedhdfs1
restart: always
volumes:
@ -10,22 +12,18 @@ services:
- ${KERBERIZED_HDFS_DIR}/secrets:/usr/local/hadoop/etc/hadoop/conf
- ${KERBERIZED_HDFS_DIR}/secrets/krb.conf:/etc/krb5.conf:ro
ports:
- 50475:50475
- 50470:50470
- 1006:1006
- 50070:50070
- 9000:9000
depends_on:
- hdfs_kerberos
- hdfskerberos
entrypoint: /etc/bootstrap.sh -d
hdfs_kerberos:
# build: ${KERBERIZED_KAFKA_DIR}
image: arenadata/kdc:latest
hostname: hdfs_kerberos
hdfskerberos:
image: yandex/clickhouse-kerberos-kdc:latest
hostname: hdfskerberos
volumes:
- ${KERBERIZED_HDFS_DIR}/secrets:/tmp/keytab
- ${KERBERIZED_HDFS_DIR}/../../kerberos_image_config.sh:/config.sh
- /dev/urandom:/dev/random
ports: [88, 749]
networks:
default:
aliases:
- hdfs.local

View File

@ -108,6 +108,87 @@ Create table with files named `file000`, `file001`, … , `file999`:
``` sql
CREATE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV')
```
## Configuration {#configuration}
Similar to GraphiteMergeTree, the HDFS engine supports extended configuration using the ClickHouse config file. There are two configuration keys that you can use: global (`hdfs`) and user-level (`hdfs_*`). The global configuration is applied first, and then the user-level configuration is applied (if it exists).
``` xml
<!-- Global configuration options for HDFS engine type -->
<hdfs>
<hadoop_kerberos_keytab>/tmp/keytab/clickhouse.keytab</hadoop_kerberos_keytab>
<hadoop_kerberos_principal>clickuser@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
<hadoop_security_authentication>kerberos</hadoop_security_authentication>
</hdfs>
<!-- Configuration specific for user "root" -->
<hdfs_root>
<hadoop_kerberos_principal>root@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
</hdfs_root>
```
### List of possible configuration options with default values
#### Supported by libhdfs3
| **parameter** | **default value** |
| rpc\_client\_connect\_tcpnodelay | true |
| dfs\_client\_read\_shortcircuit | true |
| output\_replace-datanode-on-failure | true |
| input\_notretry-another-node | false |
| input\_localread\_mappedfile | true |
| dfs\_client\_use\_legacy\_blockreader\_local | false |
| rpc\_client\_ping\_interval | 10 * 1000 |
| rpc\_client\_connect\_timeout | 600 * 1000 |
| rpc\_client\_read\_timeout | 3600 * 1000 |
| rpc\_client\_write\_timeout | 3600 * 1000 |
| rpc\_client\_socekt\_linger\_timeout | -1 |
| rpc\_client\_connect\_retry | 10 |
| rpc\_client\_timeout | 3600 * 1000 |
| dfs\_default\_replica | 3 |
| input\_connect\_timeout | 600 * 1000 |
| input\_read\_timeout | 3600 * 1000 |
| input\_write\_timeout | 3600 * 1000 |
| input\_localread\_default\_buffersize | 1 * 1024 * 1024 |
| dfs\_prefetchsize | 10 |
| input\_read\_getblockinfo\_retry | 3 |
| input\_localread\_blockinfo\_cachesize | 1000 |
| input\_read\_max\_retry | 60 |
| output\_default\_chunksize | 512 |
| output\_default\_packetsize | 64 * 1024 |
| output\_default\_write\_retry | 10 |
| output\_connect\_timeout | 600 * 1000 |
| output\_read\_timeout | 3600 * 1000 |
| output\_write\_timeout | 3600 * 1000 |
| output\_close\_timeout | 3600 * 1000 |
| output\_packetpool\_size | 1024 |
| output\_heeartbeat\_interval | 10 * 1000 |
| dfs\_client\_failover\_max\_attempts | 15 |
| dfs\_client\_read\_shortcircuit\_streams\_cache\_size | 256 |
| dfs\_client\_socketcache\_expiryMsec | 3000 |
| dfs\_client\_socketcache\_capacity | 16 |
| dfs\_default\_blocksize | 64 * 1024 * 1024 |
| dfs\_default\_uri | "hdfs://localhost:9000" |
| hadoop\_security\_authentication | "simple" |
| hadoop\_security\_kerberos\_ticket\_cache\_path | "" |
| dfs\_client\_log\_severity | "INFO" |
| dfs\_domain\_socket\_path | "" |
See [HDFS Configuration Reference ](https://hawq.apache.org/docs/userguide/2.3.0.0-incubating/reference/HDFSConfigurationParameterReference.html) for details.
#### ClickHouse extras {#clickhouse-extras}
hadoop\_kerberos\_keytab
hadoop\_kerberos\_principal
hadoop\_kerberos\_kinit\_command
## Kerberos support {#kerberos-support}
If hadoop\_security\_authentication parameter has value 'kerberos', ClickHouse authentifies via Kerberos facility.
Parameters [here](#clickhouse-extras) and hadoop\_security\_kerberos\_ticket\_cache\_path may be of help.
Note that due to libhdfs3 limitations only old-fashioned approach is supported,
datanode communications are not secured by SASL. Use tests/integration/test\_storage\_kerberized\_hdfs/hdfs_configs/bootstrap.sh for reference.
## Virtual Columns {#virtual-columns}
@ -118,4 +199,5 @@ CREATE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9
- [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns)
[Original article](https://clickhouse.tech/docs/en/operations/table_engines/hdfs/) <!--hide-->

View File

@ -148,6 +148,11 @@ using BackgroundSchedulePoolTaskInfoPtr = std::shared_ptr<BackgroundSchedulePool
class BackgroundSchedulePoolTaskHolder
{
public:
using CleanupFunc = std::function<void()>;
CleanupFunc cleanup_func;
BackgroundSchedulePoolTaskHolder() = default;
explicit BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskInfoPtr & task_info_) : task_info(task_info_) {}
BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskHolder & other) = delete;
@ -159,6 +164,8 @@ public:
{
if (task_info)
task_info->deactivate();
if (cleanup_func)
cleanup_func();
}
operator bool() const { return task_info != nullptr; }
@ -166,6 +173,8 @@ public:
BackgroundSchedulePoolTaskInfo * operator->() { return task_info.get(); }
const BackgroundSchedulePoolTaskInfo * operator->() const { return task_info.get(); }
void setCleanupFunc(const CleanupFunc function) {cleanup_func = function;}
private:
BackgroundSchedulePoolTaskInfoPtr task_info;
};

View File

@ -27,7 +27,6 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
// hdfsBuilderConfSetStr(rawBuilder, "hadoop.security.authentication", "kerberos");
// hdfsBuilderConfSetStr(rawBuilder, "dfs.client.log.severity", "TRACE");
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(path, keys);
@ -35,14 +34,8 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
{
const String key_path = path + "." + key;
/* https://hadoop.apache.org/docs/r2.8.0/hadoop-project-dist/hadoop-common/core-default.xml */
String key_name;
if (key == "hadoop_security_auth_to_local")
{
key_name = "hadoop.security.auth_to_local";
}
else if (key == "hadoop_kerberos_keytab")
if (key == "hadoop_kerberos_keytab")
{
needKinit = true;
hadoop_kerberos_keytab = config.getString(key_path);
@ -52,20 +45,25 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
{
needKinit = true;
hadoop_kerberos_principal = config.getString(key_path);
hdfsBuilderSetPrincipal(hdfs_builder, hadoop_kerberos_principal.c_str());
continue;
}
else if (key == "hadoop_kerberos_min_time_before_relogin")
else if (key == "hadoop_kerberos_kinit_command")
{
needKinit = true;
// time_relogin = config.getString(key_path);
hadoop_kerberos_kinit_command = config.getString(key_path);
continue;
}
else
else if (key == "hadoop_security_kerberos_ticket_cache_path")
{
key_name = boost::replace_all_copy(key, "_", ".");
hadoop_security_kerberos_ticket_cache_path = config.getString(key_path);
// standard param - pass to libhdfs3
}
auto & [k,v] = keep(key_name, config.getString(key_path));
key_name = boost::replace_all_copy(key, "_", ".");
const auto & [k,v] = keep(key_name, config.getString(key_path));
hdfsBuilderConfSetStr(hdfs_builder, k.c_str(), v.c_str());
}
}
@ -73,11 +71,38 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
String HDFSBuilderWrapper::getKinitCmd()
{
std::stringstream ss;
<<<<<<< HEAD
String cache_name = hadoop_security_kerberos_ticket_cache_path.empty() ? String() : (String(" -c \"") + hadoop_security_kerberos_ticket_cache_path + "\"");
ss << hadoop_kerberos_kinit_command << cache_name << " -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal <<
"|| " << hadoop_kerberos_kinit_command << cache_name << " -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal;
return ss.str();
}
void HDFSBuilderWrapper::runKinit()
{
String cmd = getKinitCmd();
LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "running kinit: {}", cmd);
std::unique_lock<std::mutex> lck(kinit_mtx);
int ret = system(cmd.c_str());
if (ret)
{ // check it works !!
throw Exception("kinit failure: " + std::to_string(ret) + " " + cmd, ErrorCodes::NETWORK_ERROR);
}
}
=======
ss << "kinit -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal <<
"|| kinit -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal;
return ss.str();
}
>>>>>>> kerberized hdfs compiled
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & context)
{
const Poco::URI uri(uri_str);
@ -114,10 +139,23 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & con
}
}
<<<<<<< HEAD
// hdfsBuilderConfSetStr(builder.get(), "hadoop.security.authentication", "kerberos");
// hdfsBuilderConfSetStr(builder.get(), "dfs.client.log.severity", "TRACE");
const auto & config = context.getConfigRef();
String user_info = uri.getUserInfo();
String user;
if (!user_info.empty() && user_info.front() != ':')
{
=======
String user_info = uri.getUserInfo();
if (!user_info.empty() && user_info.front() != ':')
{
String user;
>>>>>>> kerberized hdfs compiled
size_t delim_pos = user_info.find(':');
if (delim_pos != String::npos)
user = user_info.substr(0, delim_pos);
@ -131,9 +169,27 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & con
{
hdfsBuilderSetNameNodePort(builder.get(), port);
}
if (config.has(HDFSBuilderWrapper::CONFIG_PREFIX))
{
builder.loadFromConfig(config, HDFSBuilderWrapper::CONFIG_PREFIX);
}
if (!user.empty())
{
String user_config_prefix = HDFSBuilderWrapper::CONFIG_PREFIX + "_" + user;
if (config.has(user_config_prefix))
builder.loadFromConfig(config, user_config_prefix);
}
if (builder.needKinit)
{
builder.runKinit();
}
return builder;
}
std::mutex HDFSBuilderWrapper::kinit_mtx;
HDFSFSPtr createHDFSFS(hdfsBuilder * builder)
{
HDFSFSPtr fs(hdfsBuilderConnect(builder));
@ -143,5 +199,7 @@ HDFSFSPtr createHDFSFS(hdfsBuilder * builder)
return fs;
}
}
#endif

View File

@ -28,6 +28,47 @@ struct HDFSFsDeleter
}
};
#if 0
class KinitTaskHolder
{
using Container = std::map<std::string, BackgroundSchedulePool::TaskHolder>;
Container container;
String make_key(const HDFSBuilderWrapper & hbw)
{
return hbw.hadoop_kerberos_keytab + "^"
+ hbw.hadoop_kerberos_principal + "^"
+ std::to_string(time_relogin);
}
public:
using Descriptor = Container::iterator;
Descriptor addTask(const HDFSBuilderWrapper & hdfs_builder_wrapper)
{
auto key = make_key(hdfs_builder_wrapper);
auto it = container.find(key);
if ( it != std::end(container))
{
it = container.insert({key, task}).first;
}
return it.second->getptr();
}
void delTask(Descriptor it)
{
container.erase(it);
}
};
#endif
}
struct HDFSFileInfo
@ -54,10 +95,13 @@ struct HDFSFileInfo
class HDFSBuilderWrapper
{
hdfsBuilder * hdfs_builder;
String hadoop_kerberos_keytab;
String hadoop_kerberos_principal;
String hadoop_kerberos_kinit_command = "kinit";
String hadoop_security_kerberos_ticket_cache_path;
static std::mutex kinit_mtx;
/*mutable*/ std::vector<std::pair<String, String>> config_stor;
@ -66,18 +110,23 @@ class HDFSBuilderWrapper
return config_stor.emplace_back(std::make_pair(k, v));
}
void
loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & path);
public:
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & path);
String getKinitCmd();
static const String CONFIG_PREFIX;
bool
needKinit{false};
String
getKinitCmd();
void
runKinit();
static const String CONFIG_PREFIX;
public:
hdfsBuilder *
get()
@ -86,14 +135,15 @@ public:
}
HDFSBuilderWrapper()
: hdfs_builder(hdfsNewBuilder())
{
hdfs_builder = hdfsNewBuilder();
}
~HDFSBuilderWrapper()
{
hdfsFreeBuilder(hdfs_builder);
};
}
HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete;
HDFSBuilderWrapper(HDFSBuilderWrapper &&) = default;

View File

@ -1,6 +1,7 @@
#include "ReadBufferFromHDFS.h"
#if USE_HDFS
#include <Interpreters/Context.h>
#include <IO/HDFSCommon.h>
#include <hdfs/hdfs.h>
#include <mutex>
@ -58,12 +59,15 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
}
};
std::mutex ReadBufferFromHDFS::ReadBufferFromHDFSImpl::hdfs_init_mutex;
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, const Context & context_, size_t buf_size)
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, const Context & context, size_t buf_size)
: BufferWithOwnMemory<ReadBuffer>(buf_size)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_, context_))
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_, context))
{
// auto modified_context = std::make_shared<Context>(context);
// impl = std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_, modified_context);
}

View File

@ -20,6 +20,8 @@ class ReadBufferFromHDFS : public BufferWithOwnMemory<ReadBuffer>
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
public:
ReadBufferFromHDFS(const std::string & hdfs_name_, const Context & context, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
// ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default;
ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default;
~ReadBufferFromHDFS() override;

View File

@ -2,6 +2,7 @@
#if USE_HDFS
#include <Interpreters/Context.h>
#include <IO/WriteBufferFromHDFS.h>
#include <IO/HDFSCommon.h>
#include <hdfs/hdfs.h>
@ -36,9 +37,8 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
if (path.find_first_of("*?{") != std::string::npos)
throw Exception("URI '" + hdfs_uri + "' contains globs, so the table is in readonly mode", ErrorCodes::CANNOT_OPEN_FILE);
if (!hdfsExists(fs.get(), path.c_str()))
throw Exception("File: " + path + " is already exists", ErrorCodes::BAD_ARGUMENTS);
fout = hdfsOpenFile(fs.get(), path.c_str(), O_WRONLY, 0, 0, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
int flags = hdfsExists(fs.get(), path.c_str()) ? (O_WRONLY|O_APPEND) : O_WRONLY; /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, 0, 0);
if (fout == nullptr)
{
@ -76,6 +76,8 @@ WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, const C
: BufferWithOwnMemory<WriteBuffer>(buf_size)
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_, context))
{
// auto modified_context = std::make_shared<Context>(context);
// impl = std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_, modified_context);
}

View File

@ -176,7 +176,7 @@ public:
HDFSBlockOutputStream(const String & uri,
const String & format,
const Block & sample_block_,
const Context & context,
Context & context,
const CompressionMethod compression_method)
: sample_block(sample_block_)
{

View File

@ -485,36 +485,41 @@ class ClickHouseCluster:
raise Exception("Cannot wait ZooKeeper container")
def wait_hdfs_to_start(self, timeout=600, kerberized=False):
def wait_hdfs_to_start(self, timeout=60, kerberized=False):
start = time.time()
if kerberized:
keytab = p.abspath(p.join(self.instances['node1'].path, "secrets/clickhouse.keytab"))
krb_conf = p.abspath(p.join(self.instances['node1'].path, "secrets/krb.conf"))
hdfs_ip = self.get_instance_ip('kerberizedhdfs1')
print("kerberizedhdfs1 ip ", hdfs_ip)
kdc_ip = self.get_instance_ip('hdfs_kerberos')
kdc_ip = self.get_instance_ip('hdfskerberos')
print("kdc_ip ", kdc_ip)
hdfs_api = HDFSApi(user="root",
self.hdfs_api = HDFSApi(user="root",
timeout=timeout,
kerberized=True,
principal="hdfsuser@TEST.CLICKHOUSE.TECH",
principal="root@TEST.CLICKHOUSE.TECH",
keytab=keytab,
krb_conf=krb_conf,
# host="kerberizedhdfs1.test.clickhouse.tech",
host="kerberizedhdfs1",
protocol="https",
proxy_port=50470,
data_port=50475,
protocol="http",
# protocol="https",
proxy_port=50070,
# proxy_port=50470,
# data_port=50475,
data_port=1006,
hdfs_ip=hdfs_ip,
kdc_ip=kdc_ip)
# self.hdfs_api = hdfs_api
else:
hdfs_api = HDFSApi(user="root")
self.hdfs_api = HDFSApi(user="root", host="hdfs1")
# time.sleep(280)
# time.sleep(150)
# return
while time.time() - start < timeout:
try:
hdfs_api.write_data("/somefilewithrandomname222", "1")
self.hdfs_api.write_data("/somefilewithrandomname222", "1")
print("Connected to HDFS and SafeMode disabled! ")
return
except Exception as ex:
@ -677,7 +682,7 @@ class ClickHouseCluster:
files_to_cleanup.append(_create_env_file(self.base_dir, env_var, ".env"))
files_to_cleanup.append(_create_env_file(os.getcwd(), env_var, ".env"))
subprocess.check_call(self.base_kerberized_hdfs_cmd + common_opts, env=env_var)
self.wait_hdfs_to_start(120, kerberized=True)
self.wait_hdfs_to_start(kerberized=True, timeout=300)
remove_files(files_to_cleanup)
if self.with_mongo and self.base_mongo_cmd:
@ -935,7 +940,7 @@ class ClickHouseInstance:
if with_kerberized_kafka or with_kerberized_hdfs:
self.keytab_path = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets:/tmp/keytab"
self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb.conf:/etc/krb5.conf:ro"
self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb_ch.conf:/etc/krb5.conf:ro"
else:
self.keytab_path = ""
self.krb5_conf = ""

View File

@ -27,7 +27,7 @@ class mk_krb_conf(object):
def __enter__(self):
with open(self.krb_conf) as f:
content = f.read()
amended_content = content.replace('hdfs_kerberos', self.kdc_ip)
amended_content = content.replace('hdfskerberos', self.kdc_ip)
self.amended_krb_conf = tempfile.NamedTemporaryFile(delete=False)
self.amended_krb_conf.write(amended_content)
self.amended_krb_conf.close()
@ -39,12 +39,12 @@ class mk_krb_conf(object):
class dns_hook(object):
def __init__(self, hdfs_api):
print("dns_hook.init ", hdfs_api.kerberized)
print("dns_hook.init ", hdfs_api.kerberized, hdfs_api.host, hdfs_api.data_port, hdfs_api.proxy_port)
self.hdfs_api = hdfs_api
def __enter__(self):
global g_dns_hook
g_dns_hook = self
if self.hdfs_api.kerberized:
if True: # self.hdfs_api.kerberized:
print("g_dns_hook is None ", g_dns_hook is None)
self.original_getaddrinfo = socket.getaddrinfo
socket.getaddrinfo = custom_getaddrinfo
@ -52,15 +52,15 @@ class dns_hook(object):
def __exit__(self, type, value, traceback):
global g_dns_hook
g_dns_hook = None
if self.hdfs_api.kerberized:
if True: # self.hdfs_api.kerberized:
socket.getaddrinfo = self.original_getaddrinfo
def custom_getaddrinfo(self, *args):
print("top of custom_getaddrinfo")
(hostname, port) = args[:2]
print("top of custom_getaddrinfo", hostname, port)
if hostname == self.hdfs_api.host and (port == self.hdfs_api.data_port or port == self.hdfs_api.proxy_port):
print("dns_hook substitute")
return [(socket.AF_INET, 1, 6, '', (self.hdfs_api.hdfs_ip, port))]
return [(socket.AF_INET, 1, 6, '', ("127.0.0.1", port))] #self.hdfs_api.hdfs_ip
else:
return self.original_getaddrinfo(*args)
@ -72,7 +72,7 @@ class HDFSApi(object):
def __init__(self, user, timeout=100, kerberized=False, principal=None,
keytab=None, krb_conf=None,
host = "localhost", protocol = "http",
proxy_port = "50070", data_port = "50075", hdfs_ip = None, kdc_ip = None):
proxy_port = 50070, data_port = 50075, hdfs_ip = None, kdc_ip = None):
self.host = host
self.protocol = protocol
self.proxy_port = proxy_port
@ -87,10 +87,18 @@ class HDFSApi(object):
self.krb_conf = krb_conf
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
if kerberized:
self._run_kinit()
self.kerberos_auth = reqkerb.HTTPKerberosAuth(principal=self.principal, hostname_override=self.host)
self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal)
#principal=self.principal,
#hostname_override=self.host, principal=self.principal)
# , mutual_authentication=reqkerb.REQUIRED, force_preemptive=True)
else:
@ -105,7 +113,7 @@ class HDFSApi(object):
os.environ["KRB5_CONFIG"] = instantiated_krb_conf
cmd = "KRB5_CONFIG={instantiated_krb_conf} kinit -R -t {keytab} -k {principal} || KRB5_CONFIG={instantiated_krb_conf} kinit -R -t {keytab} -k {principal}".format(instantiated_krb_conf=instantiated_krb_conf, keytab=self.keytab, principal=self.principal)
cmd = "(kinit -R -t {keytab} -k {principal} || (sleep 5 && kinit -R -t {keytab} -k {principal})) ; klist".format(instantiated_krb_conf=instantiated_krb_conf, keytab=self.keytab, principal=self.principal)
print(cmd)
@ -123,14 +131,19 @@ class HDFSApi(object):
raise Exception("Kinit running failure")
def read_data(self, path, universal_newlines=True):
response = requests.get("{protocol}://{host}:{port}/webhdfs/v1{path}?op=OPEN".format(protocol=self.protocol, host=self.host, port=self.proxy_port, path=path), headers={'host': 'localhost'}, allow_redirects=False, verify=False, auth=self.kerberos_auth)
with dns_hook(self):
response = requests.get("{protocol}://{host}:{port}/webhdfs/v1{path}?op=OPEN".format(protocol=self.protocol, host=self.host, port=self.proxy_port, path=path), headers={'host': 'localhost'}, allow_redirects=False, verify=False, auth=self.kerberos_auth)
if response.status_code != 307:
response.raise_for_status()
additional_params = '&'.join(response.headers['Location'].split('&')[1:2])
response_data = requests.get("{protocol}://{host}:{port}/webhdfs/v1{path}?op=OPEN&{params}".format(protocol=self.protocol, host=self.host, port=self.data_port, path=path, params=additional_params), headers={'host': 'localhost'}, verify=False, auth=self.kerberos_auth)
# additional_params = '&'.join(response.headers['Location'].split('&')[1:2])
url = "{location}".format(location=response.headers['Location'])
print("redirected to ", url)
with dns_hook(self):
response_data = requests.get(url,
headers={'host': 'localhost'},
verify=False, auth=self.kerberos_auth)
if response_data.status_code != 200:
response_data.raise_for_status()
if universal_newlines:
return response_data.text
else:
@ -144,7 +157,7 @@ class HDFSApi(object):
path=path,
params=params)
if self.kerberized:
cmd = "curl -k --negotiate -s -i -X PUT -T {fname} '{url}'".format(fname=filename, url=url)
cmd = "curl -k --negotiate -s -i -X PUT -T {fname} -u : '{url}' --resolve {host}:{port}:127.0.0.1".format(fname=filename, url=url)
else:
cmd = "curl -s -i -X PUT -T {fname} '{url}'".format(fname=filename, url=url)
output = subprocess.check_output(cmd, shell=True)
@ -157,16 +170,29 @@ class HDFSApi(object):
content = content.encode()
named_file.write(content)
named_file.flush()
print("before request.put")
if self.kerberized:
print("before request.put", os.environ["KRB5_CONFIG"])
self._run_kinit()
# cmd = "klist"
# subprocess.call(cmd, shell=True)
self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal)
print(self.kerberos_auth)
with dns_hook(self):
response = requests.put(
"{protocol}://{host}:{port}/webhdfs/v1{path}?op=CREATE".format(protocol=self.protocol, host=self.host,
port=self.proxy_port,
path=path, user=self.user),
allow_redirects=False, headers={'host': 'localhost'}, verify=False, auth=self.kerberos_auth
allow_redirects=False,
headers={'host': 'localhost'},
params={'overwrite' : 'true'},
verify=False, auth=self.kerberos_auth
)
print("after request.put", response.status_code)
if response.status_code != 307:
print(response.headers)
response.raise_for_status()
print("after status code check")
@ -174,20 +200,24 @@ class HDFSApi(object):
additional_params = '&'.join(
response.headers['Location'].split('&')[1:2] + ["user.name={}".format(self.user), "overwrite=true"])
if not self.kerberized:
if False: #not self.kerberized:
output = self._curl_to_put(fpath, path, additional_params)
if "201 Created" not in output:
raise Exception("Can't create file on hdfs:\n {}".format(output))
else:
with dns_hook(self), open(fpath) as fh:
file_data = fh.read()
protocol = "http" # self.protocol
response = requests.put(
"{protocol}://{host}:{port}/webhdfs/v1{path}?op=CREATE&{params}".format(protocol=self.protocol, host=self.host, port=self.proxy_port, path=path, user=self.user, params=additional_params),
"{location}".format(location=response.headers['Location']),
data=file_data,
headers={'content-type':'text/plain', 'host': 'localhost'},
params={'file': path},
params={'file': path, 'user.name' : self.user},
allow_redirects=False, verify=False, auth=self.kerberos_auth
)
print(response)
if response.status_code != 201:
response.raise_for_status()
def write_gzip_data(self, path, content):

View File

@ -24,18 +24,14 @@ def started_cluster():
def test_read_write_storage(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/simple_storage', 'TSV')")
node1.query("insert into SimpleHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from SimpleHDFSStorage") == "1\tMark\t72.53\n"
def test_read_write_storage_with_globs(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table HDFSStorageWithRange (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage{1..5}', 'TSV')")
node1.query(
@ -46,8 +42,8 @@ def test_read_write_storage_with_globs(started_cluster):
"create table HDFSStorageWithAsterisk (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage*', 'TSV')")
for i in ["1", "2", "3"]:
hdfs_api.write_data("/storage" + i, i + "\tMark\t72.53\n")
assert hdfs_api.read_data("/storage" + i) == i + "\tMark\t72.53\n"
started_cluster.hdfs_api.write_data("/storage" + i, i + "\tMark\t72.53\n")
assert started_cluster.hdfs_api.read_data("/storage" + i) == i + "\tMark\t72.53\n"
assert node1.query("select count(*) from HDFSStorageWithRange") == "3\n"
assert node1.query("select count(*) from HDFSStorageWithEnum") == "3\n"
@ -77,25 +73,23 @@ def test_read_write_storage_with_globs(started_cluster):
def test_read_write_table(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
hdfs_api.write_data("/simple_table_function", data)
started_cluster.hdfs_api.write_data("/simple_table_function", data)
assert hdfs_api.read_data("/simple_table_function") == data
assert started_cluster.hdfs_api.read_data("/simple_table_function") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')") == data
def test_write_table(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table OtherHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/other_storage', 'TSV')")
node1.query("insert into OtherHDFSStorage values (10, 'tomas', 55.55), (11, 'jack', 32.54)")
result = "10\ttomas\t55.55\n11\tjack\t32.54\n"
assert hdfs_api.read_data("/other_storage") == result
assert started_cluster.hdfs_api.read_data("/other_storage") == result
assert node1.query("select * from OtherHDFSStorage order by id") == result
@ -120,15 +114,14 @@ def test_bad_hdfs_uri(started_cluster):
print(ex)
assert "Unable to open HDFS file" in str(ex)
@pytest.mark.timeout(800)
def test_globs_in_read_table(started_cluster):
hdfs_api = HDFSApi("root")
some_data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
globs_dir = "/dir_for_test_with_globs/"
files = ["dir1/dir_dir/file1", "dir2/file2", "simple_table_function", "dir/file", "some_dir/dir1/file",
"some_dir/dir2/file", "some_dir/file", "table1_function", "table2_function", "table3_function"]
for filename in files:
hdfs_api.write_data(globs_dir + filename, some_data)
started_cluster.hdfs_api.write_data(globs_dir + filename, some_data)
test_requests = [("dir{1..5}/dir_dir/file1", 1, 1),
("*_table_functio?", 1, 1),
@ -145,6 +138,7 @@ def test_globs_in_read_table(started_cluster):
for pattern, paths_amount, files_amount in test_requests:
inside_table_func = "'hdfs://hdfs1:9000" + globs_dir + pattern + "', 'TSV', 'id UInt64, text String, number Float64'"
print("inside_table_func ", inside_table_func)
assert node1.query("select * from hdfs(" + inside_table_func + ")") == paths_amount * some_data
assert node1.query("select count(distinct _path) from hdfs(" + inside_table_func + ")").rstrip() == str(
paths_amount)
@ -153,66 +147,60 @@ def test_globs_in_read_table(started_cluster):
def test_read_write_gzip_table(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function.gz", data)
started_cluster.hdfs_api.write_gzip_data("/simple_table_function.gz", data)
assert hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert started_cluster.hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64')") == data
def test_read_write_gzip_table_with_parameter_gzip(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function", data)
started_cluster.hdfs_api.write_gzip_data("/simple_table_function", data)
assert hdfs_api.read_gzip_data("/simple_table_function") == data
assert started_cluster.hdfs_api.read_gzip_data("/simple_table_function") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64', 'gzip')") == data
def test_read_write_table_with_parameter_none(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_data("/simple_table_function.gz", data)
started_cluster.hdfs_api.write_data("/simple_table_function.gz", data)
assert hdfs_api.read_data("/simple_table_function.gz") == data
assert started_cluster.hdfs_api.read_data("/simple_table_function.gz") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64', 'none')") == data
def test_read_write_gzip_table_with_parameter_auto_gz(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function.gz", data)
started_cluster.hdfs_api.write_gzip_data("/simple_table_function.gz", data)
assert hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert started_cluster.hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64', 'auto')") == data
def test_write_gz_storage(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table GZHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage.gz', 'TSV')")
node1.query("insert into GZHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_gzip_data("/storage.gz") == "1\tMark\t72.53\n"
assert started_cluster.hdfs_api.read_gzip_data("/storage.gz") == "1\tMark\t72.53\n"
assert node1.query("select * from GZHDFSStorage") == "1\tMark\t72.53\n"
def test_write_gzip_storage(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table GZIPHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/gzip_storage', 'TSV', 'gzip')")
node1.query("insert into GZIPHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n"
assert started_cluster.hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from GZIPHDFSStorage") == "1\tMark\t72.53\n"
if __name__ == '__main__':

View File

@ -1,7 +1,13 @@
<yandex>
<hdfs>
<hadoop_kerberos_keytab>/tmp/keytab/clickhouse.keytab</hadoop_kerberos_keytab>
<hadoop_kerberos_principal>hdfsuser@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
<hadoop_kerberos_principal>root@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
<hadoop_security_authentication>kerberos</hadoop_security_authentication>
</hdfs>
<hdfs_specuser>
<hadoop_kerberos_principal>specuser@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
</hdfs_specuser>
<hdfs_dedicatedcachepath>
<hadoop_security_kerberos_ticket_cache_path>/tmp/kerb_cache</hadoop_security_kerberos_ticket_cache_path>
</hdfs_dedicatedcachepath>
</yandex>

View File

@ -2,8 +2,18 @@
: ${HADOOP_PREFIX:=/usr/local/hadoop}
cat >> $HADOOP_PREFIX/etc/hadoop/hadoop-env.sh <<EOF
export HADOOP_SECURE_DN_USER=hdfs
export HADOOP_SECURE_DN_PID_DIR=$HADOOP_PREFIX/pid
export HADOOP_SECURE_DN_LOG_DIR=$HADOOP_PREFIX/logs/hdfs
export JSVC_HOME=$HADOOP_PREFIX/sbin
EOF
$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
mkdir -p ${HADOOP_SECURE_DN_PID_DIR}
mkdir -p ${HADOOP_SECURE_DN_LOG_DIR}
rm /tmp/*.pid
# installing libraries if any - (resource urls added comma separated to the ACP system variable)
@ -17,28 +27,40 @@ cat >> /usr/local/hadoop/etc/hadoop/core-site.xml << EOF
<name>hadoop.security.authentication</name>
<value>kerberos</value> <!-- A value of "simple" would disable security. -->
</property>
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://0.0.0.0:9000</value>
<value>hdfs://kerberizedhdfs1:9000</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://0.0.0.0:9000</value>
<value>hdfs://kerberizedhdfs1:9000</value>
</property>
<!--
<property>
<name>hadoop.rpc.protection</name>
<value>privacy</value>
</property>
-->
</configuration>
EOF
cat > /usr/local/hadoop/etc/hadoop/hdfs-site.xml << EOF
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<!--
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
-->
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<!-- General HDFS security config -->
<property>
<name>dfs.block.access.token.enable</name>
@ -70,23 +92,32 @@ cat > /usr/local/hadoop/etc/hadoop/hdfs-site.xml << EOF
<name>dfs.secondary.namenode.kerberos.internal.spnego.principal</name>
<value>HTTP/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<!-- DataNode security config -->
<!-- DataNode security config
<property>
<name>dfs.data.transfer.protection</name>
<name>dfs.data.transfer.protectionл</name>
<value>integrity</value>
</property>
-->
<property>
<name>dfs.datanode.data.dir.perm</name>
<value>700</value>
</property>
<property>
<name>dfs.datanode.address</name>
<value>0.0.0.0:10004</value>
<value>0.0.0.0:1004</value>
</property>
<property>
<name>dfs.datanode.http.address</name>
<value>0.0.0.0:10006</value>
<value>0.0.0.0:1006</value>
</property>
<!--
<property>
<name>dfs.http.policy</name>
<value>HTTPS_ONLY</value>
</property>
-->
<property>
<name>dfs.datanode.keytab.file</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.keytab</value> <!-- path to the HDFS keytab -->
@ -95,46 +126,54 @@ cat > /usr/local/hadoop/etc/hadoop/hdfs-site.xml << EOF
<name>dfs.datanode.kerberos.principal</name>
<value>hdfs/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<property>
<name>dfs.http.policy</name>
<value>HTTPS_ONLY</value>
</property>
<!-- Web Authentication config -->
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.encrypt.data.transfer</name>
<value>false</value>
</property>
<property>
<name>dfs.web.authentication.kerberos.principal</name>
<value>HTTP/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
</configuration>
EOF
cat > /usr/local/hadoop/etc/hadoop/ssl-server.xml << EOF
<configuration>
<property>
<name>ssl.server.truststore.location</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.jks</value>
</property>
<property>
<name>ssl.server.truststore.password</name>
<value>masterkey</value>
</property>
<property>
<name>ssl.server.keystore.location</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.jks</value>
</property>
<property>
<name>ssl.server.keystore.password</name>
<value>masterkey</value>
</property>
<property>
<name>ssl.server.keystore.keypassword</name>
<value>masterkey</value>
<name>dfs.web.authentication.kerberos.keytab</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.keytab</value> <!-- path to the HDFS keytab -->
</property>
</configuration>
EOF
# cat > /usr/local/hadoop/etc/hadoop/ssl-server.xml << EOF
# <configuration>
# <property>
# <name>ssl.server.truststore.location</name>
# <value>/usr/local/hadoop/etc/hadoop/conf/hdfs.jks</value>
# </property>
# <property>
# <name>ssl.server.truststore.password</name>
# <value>masterkey</value>
# </property>
# <property>
# <name>ssl.server.keystore.location</name>
# <value>/usr/local/hadoop/etc/hadoop/conf/hdfs.jks</value>
# </property>
# <property>
# <name>ssl.server.keystore.password</name>
# <value>masterkey</value>
# </property>
# <property>
# <name>ssl.server.keystore.keypassword</name>
# <value>masterkey</value>
# </property>
# </configuration>
# EOF
cat > /usr/local/hadoop/etc/hadoop/log4j.properties << EOF
# Set everything to be logged to the console
log4j.rootCategory=DEBUG, console
@ -165,6 +204,7 @@ log4j.logger.org.apache.spark.executor=DEBUG
log4j.logger.org.apache.spark.scheduler=DEBUG
EOF
useradd -u 1098 hdfs
# keytool -genkey -alias kerberized_hdfs1.test.clickhouse.tech -keyalg rsa -keysize 1024 -dname "CN=kerberized_hdfs1.test.clickhouse.tech" -keypass masterkey -keystore /usr/local/hadoop/etc/hadoop/conf/hdfs.jks -storepass masterkey
keytool -genkey -alias kerberizedhdfs1 -keyalg rsa -keysize 1024 -dname "CN=kerberizedhdfs1" -keypass masterkey -keystore /usr/local/hadoop/etc/hadoop/conf/hdfs.jks -storepass masterkey
@ -173,11 +213,44 @@ chmod g+r /usr/local/hadoop/etc/hadoop/conf/hdfs.jks
service sshd start
# yum --quiet --assumeyes install krb5-workstation.x86_64
# yum --quiet --assumeyes install tcpdump
# cd /tmp
# curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz
# tar xzf commons-daemon-1.0.15-src.tar.gz
# cd commons-daemon-1.0.15-src/src/native/unix
# ./configure && make
# cp ./jsvc /usr/local/hadoop/sbin
until kinit -kt /usr/local/hadoop/etc/hadoop/conf/hdfs.keytab hdfs/kerberizedhdfs1@TEST.CLICKHOUSE.TECH; do sleep 2; done
echo "KDC is up and ready to go... starting up"
$HADOOP_PREFIX/sbin/start-dfs.sh
$HADOOP_PREFIX/sbin/start-yarn.sh
$HADOOP_PREFIX/sbin/mr-jobhistory-daemon.sh start historyserver
yum --quiet --assumeyes install krb5-workstation.x86_64
chmod a+r /usr/local/hadoop/etc/hadoop/conf/hdfs.keytab # create dedicated keytab for hdfsuser
$HADOOP_PREFIX/sbin/start-secure-dns.sh
sleep 3
/usr/local/hadoop/bin/hdfs dfsadmin -safemode leave
/usr/local/hadoop/bin/hdfs dfs -mkdir /user/specuser
/usr/local/hadoop/bin/hdfs dfs -chown specuser /user/specuser
kdestroy
# adduser --groups hdfs hdfsuser
# /usr/local/hadoop/sbin/hadoop-daemon.sh --config /usr/local/hadoop/etc/hadoop/ --script /usr/local/hadoop/sbin/hdfs start namenode
# /usr/local/hadoop/sbin/hadoop-daemon.sh --config /usr/local/hadoop/etc/hadoop/ --script /usr/local/hadoop/sbin/hdfs start datanode
if [[ $1 == "-d" ]]; then
while true; do sleep 1000; done

View File

@ -92,6 +92,11 @@ create_admin_user() {
create_keytabs() {
# kadmin.local -q "addprinc -randkey hdfs/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
# kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab hdfs/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
# kadmin.local -q "addprinc -randkey HTTP/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
# kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab HTTP/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
kadmin.local -q "addprinc -randkey hdfs/kerberizedhdfs1@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab hdfs/kerberizedhdfs1@${REALM}"
@ -102,6 +107,10 @@ create_keytabs() {
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab hdfsuser/node1@${REALM}"
kadmin.local -q "addprinc -randkey hdfsuser@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab hdfsuser@${REALM}"
kadmin.local -q "addprinc -randkey root@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab root@${REALM}"
kadmin.local -q "addprinc -randkey specuser@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab specuser@${REALM}"
chmod g+r /tmp/keytab/clickhouse.keytab
}

View File

@ -8,7 +8,7 @@
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 15d
renew_lifetime = 15d
# renew_lifetime = 15d
forwardable = true
default_tgs_enctypes = des3-hmac-sha1
default_tkt_enctypes = des3-hmac-sha1
@ -16,10 +16,10 @@
[realms]
TEST.CLICKHOUSE.TECH = {
kdc = hdfs_kerberos
admin_server = hdfs_kerberos
kdc = hdfskerberos
admin_server = hdfskerberos
}
[domain_realm]
.TEST.CLICKHOUSE.TECH = TEST.CLICKHOUSE.TECH
TEST.CLICKHOUSE.TECH = TEST.CLICKHOUSE.TECH
.test.clickhouse.tech = TEST.CLICKHOUSE.TECH
test.clickhouse.tech = TEST.CLICKHOUSE.TECH

View File

@ -0,0 +1,25 @@
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = TEST.CLICKHOUSE.TECH
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 15s
# renew_lifetime = 15d
forwardable = true
default_tgs_enctypes = des3-hmac-sha1
default_tkt_enctypes = des3-hmac-sha1
permitted_enctypes = des3-hmac-sha1
[realms]
TEST.CLICKHOUSE.TECH = {
kdc = hdfskerberos
admin_server = hdfskerberos
}
[domain_realm]
.test.clickhouse.tech = TEST.CLICKHOUSE.TECH
test.clickhouse.tech = TEST.CLICKHOUSE.TECH

View File

@ -1,8 +1,5 @@
import time
import pytest
import requests
from tempfile import NamedTemporaryFile
from helpers.hdfs_api import HDFSApi
import os
@ -10,7 +7,6 @@ from helpers.cluster import ClickHouseCluster
import subprocess
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_kerberized_hdfs=True, user_configs=[], main_configs=['configs/log_conf.xml', 'configs/hdfs.xml'])
@ -27,14 +23,110 @@ def started_cluster():
finally:
cluster.shutdown()
def test_read_write_storage(started_cluster):
hdfs_api = HDFSApi("root")
def test_read_table(started_cluster):
# hdfs_api = HDFSApi("root")
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
started_cluster.hdfs_api.write_data("/simple_table_function", data)
api_read = started_cluster.hdfs_api.read_data("/simple_table_function")
print("api_read", api_read)
assert api_read == data
select_read = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')")
print("select_read", select_read)
assert select_read == data
def test_read_write_storage(started_cluster):
# node1.query("create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberized_hdfs1.test.clickhouse.tech:9000/simple_storage', 'TSV')")
node1.query("create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage', 'TSV')")
node1.query("insert into SimpleHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from SimpleHDFSStorage") == "1\tMark\t72.53\n"
node1.query("create table SimpleHDFSStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage1', 'TSV')")
node1.query("insert into SimpleHDFSStorage2 values (1, 'Mark', 72.53)")
api_read = started_cluster.hdfs_api.read_data("/simple_storage1")
print("api_read", api_read)
assert api_read == "1\tMark\t72.53\n"
select_read = node1.query("select * from SimpleHDFSStorage2")
print("select_read", select_read)
assert select_read == "1\tMark\t72.53\n"
def test_write_storage_expired(started_cluster):
node1.query("create table SimpleHDFSStorageExpired (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage_expired', 'TSV')")
time.sleep(45) # wait for ticket expiration
node1.query("insert into SimpleHDFSStorageExpired values (1, 'Mark', 72.53)")
api_read = started_cluster.hdfs_api.read_data("/simple_storage_expired")
print("api_read", api_read)
assert api_read == "1\tMark\t72.53\n"
select_read = node1.query("select * from SimpleHDFSStorageExpired")
print("select_read", select_read)
assert select_read == "1\tMark\t72.53\n"
def test_prohibited(started_cluster):
node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://specuser@kerberizedhdfs1:9000/storage_user_two_prohibited', 'TSV')")
try:
node1.query("insert into HDFSStorTwoProhibited values (1, 'SomeOne', 74.00)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "Unable to open HDFS file: /storage_user_two_prohibited error: Permission denied: user=specuser, access=WRITE" in str(ex)
def test_two_users(started_cluster):
node1.query("create table HDFSStorOne (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/storage_user_one', 'TSV')")
node1.query("insert into HDFSStorOne values (1, 'IlyaReal', 86.00)")
node1.query("create table HDFSStorTwo (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://specuser@kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV')")
node1.query("insert into HDFSStorTwo values (1, 'IlyaIdeal', 74.00)")
select_read_1 = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV', 'id UInt64, text String, number Float64')")
print("select_read_1", select_read_1)
select_read_2 = node1.query("select * from hdfs('hdfs://specuser@kerberizedhdfs1:9000/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')")
print("select_read_2", select_read_2)
node1.query("create table HDFSStorTwo_ (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV')")
try:
node1.query("insert into HDFSStorTwo_ values (1, 'AnotherPerspn', 88.54)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "DB::Exception: Unable to open HDFS file: /user/specuser/storage_user_two error: Permission denied: user=root, access=WRITE, inode=\"/user/specuser/storage_user_two\":specuser:supergroup:drwxr-xr-x" in str(ex)
def test_cache_path(started_cluster):
node1.query("create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9000/storage_dedicated_cache_path', 'TSV')")
node1.query("insert into HDFSStorCachePath values (1, 'FatMark', 92.53)")
api_read = started_cluster.hdfs_api.read_data("/storage_dedicated_cache_path")
print("api_read", api_read)
assert api_read == "1\tFatMark\t92.53\n"
def test_read_table_not_expired(started_cluster):
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
started_cluster.hdfs_api.write_data("/simple_table_function_relogin", data)
started_cluster.pause_container('hdfskerberos')
time.sleep(45)
try:
select_read = node1.query("select * from hdfs('hdfs://reloginuser&kerberizedhdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "DB::Exception: kinit failure:" in str(ex)
started_cluster.unpause_container('hdfskerberos')
@pytest.mark.timeout(999999)
def _test_sleep_forever(started_cluster):
time.sleep(999999)
if __name__ == '__main__':