Merge pull request #16621 from arenadata/ADQM-148

HDFS configuration and Kerberos Authenticaiton
This commit is contained in:
alesapin 2020-12-17 23:16:58 +03:00 committed by GitHub
commit 799997db35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
37 changed files with 1332 additions and 246 deletions

2
contrib/libgsasl vendored

@ -1 +1 @@
Subproject commit 140fb58250588c8323285b75fcf127c4adc33dfa
Subproject commit 383ee28e82f69fa16ed43b48bd9c8ee5b313ab84

2
contrib/libhdfs3 vendored

@ -1 +1 @@
Subproject commit 30552ac527f2c14070d834e171493b2e7f662375
Subproject commit 095b9d48b400abb72d967cb0539af13b1e3d90cf

View File

@ -17,7 +17,12 @@ if (NOT USE_INTERNAL_PROTOBUF_LIBRARY AND PROTOBUF_OLD_ABI_COMPAT)
endif ()
endif()
set(WITH_KERBEROS false)
if (${ENABLE_LIBRARIES} AND ${ENABLE_KRB5})
SET(WITH_KERBEROS 1)
else()
SET(WITH_KERBEROS 0)
endif()
# project and source dir
set(HDFS3_ROOT_DIR ${ClickHouse_SOURCE_DIR}/contrib/libhdfs3)
set(HDFS3_SOURCE_DIR ${HDFS3_ROOT_DIR}/src)
@ -28,11 +33,6 @@ set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake" ${CMAKE_MODULE_PATH})
include(Platform)
include(Options)
# prefer shared libraries
if (WITH_KERBEROS)
find_package(KERBEROS REQUIRED)
endif()
# source
set(PROTO_FILES
#${HDFS3_SOURCE_DIR}/proto/encryption.proto
@ -207,14 +207,11 @@ target_include_directories(hdfs3 PRIVATE ${HDFS3_COMMON_DIR})
target_include_directories(hdfs3 PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
target_include_directories(hdfs3 PRIVATE ${LIBGSASL_INCLUDE_DIR})
if (WITH_KERBEROS)
target_include_directories(hdfs3 PRIVATE ${KERBEROS_INCLUDE_DIRS})
endif()
target_include_directories(hdfs3 PRIVATE ${LIBXML2_INCLUDE_DIR})
target_link_libraries(hdfs3 PRIVATE ${LIBGSASL_LIBRARY})
if (WITH_KERBEROS)
target_link_libraries(hdfs3 PRIVATE ${KERBEROS_LIBRARIES})
target_link_libraries(hdfs3 PRIVATE ${KRB5_LIBRARY})
endif()
target_link_libraries(hdfs3 PRIVATE ${LIBXML2_LIBRARIES})

2
debian/control vendored
View File

@ -40,7 +40,7 @@ Description: Common files for ClickHouse
Package: clickhouse-server
Architecture: all
Depends: ${shlibs:Depends}, ${misc:Depends}, clickhouse-common-static (= ${binary:Version}), adduser
Recommends: libcap2-bin
Recommends: libcap2-bin, krb5-user
Replaces: clickhouse-server-common, clickhouse-server-base
Provides: clickhouse-server-common
Description: Server binary for ClickHouse

View File

@ -158,5 +158,9 @@
"name": "yandex/clickhouse-stateless-unbundled-test",
"dependent": [
]
},
"docker/test/integration/kerberized_hadoop": {
"name": "yandex/clickhouse-kerberized-hadoop",
"dependent": []
}
}

View File

@ -0,0 +1,18 @@
# docker build -t yandex/clickhouse-kerberized-hadoop .
FROM sequenceiq/hadoop-docker:2.7.0
RUN sed -i -e 's/^\#baseurl/baseurl/' /etc/yum.repos.d/CentOS-Base.repo
RUN sed -i -e 's/^mirrorlist/#mirrorlist/' /etc/yum.repos.d/CentOS-Base.repo
RUN sed -i -e 's#http://mirror.centos.org/#http://vault.centos.org/#' /etc/yum.repos.d/CentOS-Base.repo
RUN yum clean all && \
rpm --rebuilddb && \
yum -y update && \
yum -y install yum-plugin-ovl && \
yum --quiet -y install krb5-workstation.x86_64
RUN cd /tmp && \
curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz && \
tar xzf commons-daemon-1.0.15-src.tar.gz && \
cd commons-daemon-1.0.15-src/src/native/unix && \
./configure && \
make && \
cp ./jsvc /usr/local/hadoop/sbin

View File

@ -29,6 +29,8 @@ RUN apt-get update \
libcurl4-openssl-dev \
gdb \
software-properties-common \
libkrb5-dev \
krb5-user \
&& rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \
@ -75,7 +77,8 @@ RUN python3 -m pip install \
pytest-timeout \
redis \
tzlocal \
urllib3
urllib3 \
requests-kerberos
COPY modprobe.sh /usr/local/bin/modprobe
COPY dockerd-entrypoint.sh /usr/local/bin/

View File

@ -2,6 +2,7 @@ version: '2.3'
services:
hdfs1:
image: sequenceiq/hadoop-docker:2.7.0
hostname: hdfs1
restart: always
ports:
- 50075:50075

View File

@ -0,0 +1,29 @@
version: '2.3'
services:
kerberizedhdfs1:
cap_add:
- DAC_READ_SEARCH
image: yandex/clickhouse-kerberized-hadoop:16621
hostname: kerberizedhdfs1
restart: always
volumes:
- ${KERBERIZED_HDFS_DIR}/../../hdfs_configs/bootstrap.sh:/etc/bootstrap.sh:ro
- ${KERBERIZED_HDFS_DIR}/secrets:/usr/local/hadoop/etc/hadoop/conf
- ${KERBERIZED_HDFS_DIR}/secrets/krb_long.conf:/etc/krb5.conf:ro
ports:
- 1006:1006
- 50070:50070
- 9000:9000
depends_on:
- hdfskerberos
entrypoint: /etc/bootstrap.sh -d
hdfskerberos:
image: yandex/clickhouse-kerberos-kdc:${DOCKER_KERBEROS_KDC_TAG}
hostname: hdfskerberos
volumes:
- ${KERBERIZED_HDFS_DIR}/secrets:/tmp/keytab
- ${KERBERIZED_HDFS_DIR}/../../kerberos_image_config.sh:/config.sh
- /dev/urandom:/dev/random
ports: [88, 749]

View File

@ -108,6 +108,95 @@ Create table with files named `file000`, `file001`, … , `file999`:
``` sql
CREATE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV')
```
## Configuration {#configuration}
Similar to GraphiteMergeTree, the HDFS engine supports extended configuration using the ClickHouse config file. There are two configuration keys that you can use: global (`hdfs`) and user-level (`hdfs_*`). The global configuration is applied first, and then the user-level configuration is applied (if it exists).
``` xml
<!-- Global configuration options for HDFS engine type -->
<hdfs>
<hadoop_kerberos_keytab>/tmp/keytab/clickhouse.keytab</hadoop_kerberos_keytab>
<hadoop_kerberos_principal>clickuser@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
<hadoop_security_authentication>kerberos</hadoop_security_authentication>
</hdfs>
<!-- Configuration specific for user "root" -->
<hdfs_root>
<hadoop_kerberos_principal>root@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
</hdfs_root>
```
### List of possible configuration options with default values
#### Supported by libhdfs3
| **parameter** | **default value** |
| rpc\_client\_connect\_tcpnodelay | true |
| dfs\_client\_read\_shortcircuit | true |
| output\_replace-datanode-on-failure | true |
| input\_notretry-another-node | false |
| input\_localread\_mappedfile | true |
| dfs\_client\_use\_legacy\_blockreader\_local | false |
| rpc\_client\_ping\_interval | 10 * 1000 |
| rpc\_client\_connect\_timeout | 600 * 1000 |
| rpc\_client\_read\_timeout | 3600 * 1000 |
| rpc\_client\_write\_timeout | 3600 * 1000 |
| rpc\_client\_socekt\_linger\_timeout | -1 |
| rpc\_client\_connect\_retry | 10 |
| rpc\_client\_timeout | 3600 * 1000 |
| dfs\_default\_replica | 3 |
| input\_connect\_timeout | 600 * 1000 |
| input\_read\_timeout | 3600 * 1000 |
| input\_write\_timeout | 3600 * 1000 |
| input\_localread\_default\_buffersize | 1 * 1024 * 1024 |
| dfs\_prefetchsize | 10 |
| input\_read\_getblockinfo\_retry | 3 |
| input\_localread\_blockinfo\_cachesize | 1000 |
| input\_read\_max\_retry | 60 |
| output\_default\_chunksize | 512 |
| output\_default\_packetsize | 64 * 1024 |
| output\_default\_write\_retry | 10 |
| output\_connect\_timeout | 600 * 1000 |
| output\_read\_timeout | 3600 * 1000 |
| output\_write\_timeout | 3600 * 1000 |
| output\_close\_timeout | 3600 * 1000 |
| output\_packetpool\_size | 1024 |
| output\_heeartbeat\_interval | 10 * 1000 |
| dfs\_client\_failover\_max\_attempts | 15 |
| dfs\_client\_read\_shortcircuit\_streams\_cache\_size | 256 |
| dfs\_client\_socketcache\_expiryMsec | 3000 |
| dfs\_client\_socketcache\_capacity | 16 |
| dfs\_default\_blocksize | 64 * 1024 * 1024 |
| dfs\_default\_uri | "hdfs://localhost:9000" |
| hadoop\_security\_authentication | "simple" |
| hadoop\_security\_kerberos\_ticket\_cache\_path | "" |
| dfs\_client\_log\_severity | "INFO" |
| dfs\_domain\_socket\_path | "" |
[HDFS Configuration Reference ](https://hawq.apache.org/docs/userguide/2.3.0.0-incubating/reference/HDFSConfigurationParameterReference.html) might explain some parameters.
#### ClickHouse extras {#clickhouse-extras}
| **parameter** | **default value** |
|hadoop\_kerberos\_keytab | "" |
|hadoop\_kerberos\_principal | "" |
|hadoop\_kerberos\_kinit\_command | kinit |
#### Limitations {#limitations}
* hadoop\_security\_kerberos\_ticket\_cache\_path can be global only, not user specific
## Kerberos support {#kerberos-support}
If hadoop\_security\_authentication parameter has value 'kerberos', ClickHouse authentifies via Kerberos facility.
Parameters [here](#clickhouse-extras) and hadoop\_security\_kerberos\_ticket\_cache\_path may be of help.
Note that due to libhdfs3 limitations only old-fashioned approach is supported,
datanode communications are not secured by SASL (HADOOP\_SECURE\_DN\_USER is a reliable indicator of such
security approach). Use tests/integration/test\_storage\_kerberized\_hdfs/hdfs_configs/bootstrap.sh for reference.
If hadoop\_kerberos\_keytab, hadoop\_kerberos\_principal or hadoop\_kerberos\_kinit\_command is specified, kinit will be invoked. hadoop\_kerberos\_keytab and hadoop\_kerberos\_principal are mandatory in this case. kinit tool and krb5 configuration files are required.
## Virtual Columns {#virtual-columns}

View File

@ -88,6 +88,10 @@ if (USE_AWS_S3)
add_headers_and_sources(dbms Disks/S3)
endif()
if (USE_HDFS)
add_headers_and_sources(dbms Storages/HDFS)
endif()
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
@ -389,8 +393,8 @@ if (USE_GRPC)
endif()
if (USE_HDFS)
target_link_libraries (clickhouse_common_io PUBLIC ${HDFS3_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR})
dbms_target_link_libraries(PRIVATE ${HDFS3_LIBRARY})
dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR})
endif()
if (USE_AWS_S3)

View File

@ -5,6 +5,7 @@
#cmakedefine01 USE_RE2_ST
#cmakedefine01 USE_SSL
#cmakedefine01 USE_HDFS
#cmakedefine01 USE_INTERNAL_HDFS3_LIBRARY
#cmakedefine01 USE_AWS_S3
#cmakedefine01 USE_BROTLI
#cmakedefine01 USE_UNWIND

View File

@ -1,62 +0,0 @@
#include <IO/HDFSCommon.h>
#include <Poco/URI.h>
#if USE_HDFS
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
}
HDFSBuilderPtr createHDFSBuilder(const std::string & uri_str)
{
const Poco::URI uri(uri_str);
const auto & host = uri.getHost();
auto port = uri.getPort();
const std::string path = "//";
if (host.empty())
throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
HDFSBuilderPtr builder(hdfsNewBuilder());
if (builder == nullptr)
throw Exception("Unable to create builder to connect to HDFS: " + uri.toString() + " " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
std::string user_info = uri.getUserInfo();
if (!user_info.empty() && user_info.front() != ':')
{
std::string user;
size_t delim_pos = user_info.find(':');
if (delim_pos != std::string::npos)
user = user_info.substr(0, delim_pos);
else
user = user_info;
hdfsBuilderSetUserName(builder.get(), user.c_str());
}
hdfsBuilderSetNameNode(builder.get(), host.c_str());
if (port != 0)
{
hdfsBuilderSetNameNodePort(builder.get(), port);
}
return builder;
}
HDFSFSPtr createHDFSFS(hdfsBuilder * builder)
{
HDFSFSPtr fs(hdfsBuilderConnect(builder));
if (fs == nullptr)
throw Exception("Unable to connect to HDFS: " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
return fs;
}
}
#endif

View File

@ -1,58 +0,0 @@
#pragma once
#include <Common/config.h>
#include <memory>
#include <type_traits>
#if USE_HDFS
#include <hdfs/hdfs.h>
namespace DB
{
namespace detail
{
struct HDFSBuilderDeleter
{
void operator()(hdfsBuilder * builder_ptr)
{
hdfsFreeBuilder(builder_ptr);
}
};
struct HDFSFsDeleter
{
void operator()(hdfsFS fs_ptr)
{
hdfsDisconnect(fs_ptr);
}
};
}
struct HDFSFileInfo
{
hdfsFileInfo * file_info;
int length;
HDFSFileInfo()
: file_info(nullptr)
, length(0)
{
}
HDFSFileInfo(const HDFSFileInfo & other) = delete;
HDFSFileInfo(HDFSFileInfo && other) = default;
HDFSFileInfo & operator=(const HDFSFileInfo & other) = delete;
HDFSFileInfo & operator=(HDFSFileInfo && other) = default;
~HDFSFileInfo()
{
hdfsFreeFileInfo(file_info, length);
}
};
using HDFSBuilderPtr = std::unique_ptr<hdfsBuilder, detail::HDFSBuilderDeleter>;
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
/// TODO Allow to tune from query Settings.
HDFSBuilderPtr createHDFSBuilder(const std::string & uri_str);
HDFSFSPtr createHDFSFS(hdfsBuilder * builder);
}
#endif

View File

@ -0,0 +1,194 @@
#include <Storages/HDFS/HDFSCommon.h>
#include <Poco/URI.h>
#include <boost/algorithm/string/replace.hpp>
#if USE_HDFS
#include <Common/ShellCommand.h>
#include <Common/Exception.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int NO_ELEMENTS_IN_CONFIG;
}
const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs";
void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration & config,
const String & config_path, bool isUser)
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_path, keys);
for (const auto & key : keys)
{
const String key_path = config_path + "." + key;
String key_name;
if (key == "hadoop_kerberos_keytab")
{
need_kinit = true;
hadoop_kerberos_keytab = config.getString(key_path);
continue;
}
else if (key == "hadoop_kerberos_principal")
{
need_kinit = true;
hadoop_kerberos_principal = config.getString(key_path);
#if USE_INTERNAL_HDFS3_LIBRARY
hdfsBuilderSetPrincipal(hdfs_builder, hadoop_kerberos_principal.c_str());
#endif
continue;
}
else if (key == "hadoop_kerberos_kinit_command")
{
need_kinit = true;
hadoop_kerberos_kinit_command = config.getString(key_path);
continue;
}
else if (key == "hadoop_security_kerberos_ticket_cache_path")
{
if (isUser)
{
throw Exception("hadoop.security.kerberos.ticket.cache.path cannot be set per user",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
}
hadoop_security_kerberos_ticket_cache_path = config.getString(key_path);
// standard param - pass further
}
key_name = boost::replace_all_copy(key, "_", ".");
const auto & [k,v] = keep(key_name, config.getString(key_path));
hdfsBuilderConfSetStr(hdfs_builder, k.c_str(), v.c_str());
}
}
String HDFSBuilderWrapper::getKinitCmd()
{
if (hadoop_kerberos_keytab.empty() || hadoop_kerberos_principal.empty())
{
throw Exception("Not enough parameters to run kinit",
ErrorCodes::NO_ELEMENTS_IN_CONFIG);
}
WriteBufferFromOwnString ss;
String cache_name = hadoop_security_kerberos_ticket_cache_path.empty() ?
String() :
(String(" -c \"") + hadoop_security_kerberos_ticket_cache_path + "\"");
// command to run looks like
// kinit -R -t /keytab_dir/clickhouse.keytab -k somebody@TEST.CLICKHOUSE.TECH || ..
ss << hadoop_kerberos_kinit_command << cache_name <<
" -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal <<
"|| " << hadoop_kerberos_kinit_command << cache_name << " -t \"" <<
hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal;
return ss.str();
}
void HDFSBuilderWrapper::runKinit()
{
String cmd = getKinitCmd();
LOG_DEBUG(&Poco::Logger::get("HDFSClient"), "running kinit: {}", cmd);
std::unique_lock<std::mutex> lck(kinit_mtx);
auto command = ShellCommand::execute(cmd);
auto status = command->tryWait();
if (status)
{
throw Exception("kinit failure: " + cmd, ErrorCodes::BAD_ARGUMENTS);
}
}
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration & config)
{
const Poco::URI uri(uri_str);
const auto & host = uri.getHost();
auto port = uri.getPort();
const String path = "//";
if (host.empty())
throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
HDFSBuilderWrapper builder;
if (builder.get() == nullptr)
throw Exception("Unable to create builder to connect to HDFS: " +
uri.toString() + " " + String(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
String user_info = uri.getUserInfo();
String user;
if (!user_info.empty() && user_info.front() != ':')
{
size_t delim_pos = user_info.find(':');
if (delim_pos != String::npos)
user = user_info.substr(0, delim_pos);
else
user = user_info;
hdfsBuilderSetUserName(builder.get(), user.c_str());
}
hdfsBuilderSetNameNode(builder.get(), host.c_str());
if (port != 0)
{
hdfsBuilderSetNameNodePort(builder.get(), port);
}
if (config.has(HDFSBuilderWrapper::CONFIG_PREFIX))
{
builder.loadFromConfig(config, HDFSBuilderWrapper::CONFIG_PREFIX);
}
if (!user.empty())
{
String user_config_prefix = HDFSBuilderWrapper::CONFIG_PREFIX + "_" + user;
if (config.has(user_config_prefix))
{
#if USE_INTERNAL_HDFS3_LIBRARY
builder.loadFromConfig(config, user_config_prefix, true);
#else
throw Exception("Multi user HDFS configuration required internal libhdfs3",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
#endif
}
}
if (builder.need_kinit)
{
builder.runKinit();
}
return builder;
}
std::mutex HDFSBuilderWrapper::kinit_mtx;
HDFSFSPtr createHDFSFS(hdfsBuilder * builder)
{
HDFSFSPtr fs(hdfsBuilderConnect(builder));
if (fs == nullptr)
throw Exception("Unable to connect to HDFS: " + String(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
return fs;
}
}
#endif

View File

@ -0,0 +1,114 @@
#pragma once
#include <Common/config.h>
#if USE_HDFS
#include <memory>
#include <type_traits>
#include <vector>
#include <hdfs/hdfs.h>
#include <common/types.h>
#include <mutex>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
namespace detail
{
struct HDFSFsDeleter
{
void operator()(hdfsFS fs_ptr)
{
hdfsDisconnect(fs_ptr);
}
};
}
struct HDFSFileInfo
{
hdfsFileInfo * file_info;
int length;
HDFSFileInfo()
: file_info(nullptr)
, length(0)
{
}
HDFSFileInfo(const HDFSFileInfo & other) = delete;
HDFSFileInfo(HDFSFileInfo && other) = default;
HDFSFileInfo & operator=(const HDFSFileInfo & other) = delete;
HDFSFileInfo & operator=(HDFSFileInfo && other) = default;
~HDFSFileInfo()
{
hdfsFreeFileInfo(file_info, length);
}
};
class HDFSBuilderWrapper
{
hdfsBuilder * hdfs_builder;
String hadoop_kerberos_keytab;
String hadoop_kerberos_principal;
String hadoop_kerberos_kinit_command = "kinit";
String hadoop_security_kerberos_ticket_cache_path;
static std::mutex kinit_mtx;
std::vector<std::pair<String, String>> config_stor;
// hdfs builder relies on an external config data storage
std::pair<String, String>& keep(const String & k, const String & v)
{
return config_stor.emplace_back(std::make_pair(k, v));
}
bool need_kinit{false};
static const String CONFIG_PREFIX;
private:
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser = false);
String getKinitCmd();
void runKinit();
public:
hdfsBuilder *
get()
{
return hdfs_builder;
}
HDFSBuilderWrapper()
: hdfs_builder(hdfsNewBuilder())
{
}
~HDFSBuilderWrapper()
{
hdfsFreeBuilder(hdfs_builder);
}
HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete;
HDFSBuilderWrapper(HDFSBuilderWrapper &&) = default;
friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
};
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
/// TODO Allow to tune from query Settings.
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
HDFSFSPtr createHDFSFS(hdfsBuilder * builder);
}
#endif

View File

@ -1,7 +1,7 @@
#include "ReadBufferFromHDFS.h"
#if USE_HDFS
#include <IO/HDFSCommon.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <hdfs/hdfs.h>
#include <mutex>
@ -23,15 +23,16 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
std::string hdfs_uri;
hdfsFile fin;
HDFSBuilderPtr builder;
HDFSBuilderWrapper builder;
HDFSFSPtr fs;
explicit ReadBufferFromHDFSImpl(const std::string & hdfs_name_)
: hdfs_uri(hdfs_name_)
explicit ReadBufferFromHDFSImpl(const std::string & hdfs_name_,
const Poco::Util::AbstractConfiguration & config_)
: hdfs_uri(hdfs_name_),
builder(createHDFSBuilder(hdfs_uri, config_))
{
std::lock_guard lock(hdfs_init_mutex);
builder = createHDFSBuilder(hdfs_uri);
fs = createHDFSFS(builder.get());
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const std::string path = hdfs_uri.substr(begin_of_path);
@ -58,11 +59,14 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
}
};
std::mutex ReadBufferFromHDFS::ReadBufferFromHDFSImpl::hdfs_init_mutex;
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size)
: BufferWithOwnMemory<ReadBuffer>(buf_size)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_))
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_,
const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_)
: BufferWithOwnMemory<ReadBuffer>(buf_size_)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_, config_))
{
}

View File

@ -8,6 +8,13 @@
#include <string>
#include <memory>
#include <hdfs/hdfs.h>
#include <common/types.h>
#include <Interpreters/Context.h>
namespace DB
{
/** Accepts HDFS path to file and opens it.
@ -18,7 +25,7 @@ class ReadBufferFromHDFS : public BufferWithOwnMemory<ReadBuffer>
struct ReadBufferFromHDFSImpl;
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
public:
ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
ReadBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
~ReadBufferFromHDFS() override;
bool nextImpl() override;

View File

@ -3,15 +3,15 @@
#if USE_HDFS
#include <Storages/StorageFactory.h>
#include <Storages/StorageHDFS.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromHDFS.h>
#include <IO/WriteBufferFromHDFS.h>
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <IO/WriteHelpers.h>
#include <IO/HDFSCommon.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h>
@ -121,7 +121,7 @@ public:
current_path = uri + path;
auto compression = chooseCompressionMethod(path, compression_method);
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(current_path), compression);
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(current_path, context.getGlobalContext().getConfigRef()), compression);
auto input_stream = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<OwningBlockInputStream<ReadBuffer>>(input_stream, std::move(read_buf));
@ -180,7 +180,7 @@ public:
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri), compression_method, 3);
write_buf = wrapWriteBufferWithCompressionMethod(std::make_unique<WriteBufferFromHDFS>(uri, context.getGlobalContext().getConfigRef()), compression_method, 3);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
@ -274,7 +274,7 @@ Pipe StorageHDFS::read(
const String path_from_uri = uri.substr(begin_of_path);
const String uri_without_path = uri.substr(0, begin_of_path);
HDFSBuilderPtr builder = createHDFSBuilder(uri_without_path + "/");
HDFSBuilderWrapper builder = createHDFSBuilder(uri_without_path + "/", context_.getGlobalContext().getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
auto sources_info = std::make_shared<HDFSSource::SourcesInfo>();

View File

@ -2,8 +2,9 @@
#if USE_HDFS
#include <IO/WriteBufferFromHDFS.h>
#include <IO/HDFSCommon.h>
#include <Interpreters/Context.h>
#include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <Storages/HDFS/HDFSCommon.h>
#include <hdfs/hdfs.h>
@ -23,12 +24,12 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
{
std::string hdfs_uri;
hdfsFile fout;
HDFSBuilderPtr builder;
HDFSBuilderWrapper builder;
HDFSFSPtr fs;
explicit WriteBufferFromHDFSImpl(const std::string & hdfs_name_)
explicit WriteBufferFromHDFSImpl(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration & config_)
: hdfs_uri(hdfs_name_)
, builder(createHDFSBuilder(hdfs_uri))
, builder(createHDFSBuilder(hdfs_uri,config_))
, fs(createHDFSFS(builder.get()))
{
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
@ -72,9 +73,9 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
}
};
WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size)
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_))
WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration & config_, size_t buf_size_)
: BufferWithOwnMemory<WriteBuffer>(buf_size_)
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_, config_))
{
}

View File

@ -18,7 +18,7 @@ class WriteBufferFromHDFS : public BufferWithOwnMemory<WriteBuffer>
struct WriteBufferFromHDFSImpl;
std::unique_ptr<WriteBufferFromHDFSImpl> impl;
public:
WriteBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
WriteBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
WriteBufferFromHDFS(WriteBufferFromHDFS &&) = default;
void nextImpl() override;

View File

@ -2,7 +2,7 @@
#include "registerTableFunctions.h"
#if USE_HDFS
#include <Storages/StorageHDFS.h>
#include <Storages/HDFS/StorageHDFS.h>
#include <Storages/ColumnsDescription.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionHDFS.h>

View File

@ -38,7 +38,8 @@ sudo -H pip install \
pytest-timeout \
redis \
tzlocal \
urllib3
urllib3 \
requests-kerberos
```
(highly not recommended) If you really want to use OS packages on modern debian/ubuntu instead of "pip": `sudo apt install -y docker docker-compose python3-pytest python3-dicttoxml python3-docker python3-pymysql python3-pymongo python3-tzlocal python3-kazoo python3-psycopg2 kafka-python python3-pytest-timeout python3-minio`

View File

@ -137,6 +137,7 @@ class ClickHouseCluster:
self.with_rabbitmq = False
self.with_odbc_drivers = False
self.with_hdfs = False
self.with_kerberized_hdfs = False
self.with_mongo = False
self.with_net_trics = False
self.with_redis = False
@ -172,7 +173,7 @@ class ClickHouseCluster:
macros=None,
with_zookeeper=False, with_mysql=False, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False,
clickhouse_path_dir=None,
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_mongo=False,
with_odbc_drivers=False, with_postgres=False, with_hdfs=False, with_kerberized_hdfs=False, with_mongo=False,
with_redis=False, with_minio=False, with_cassandra=False,
hostname=None, env_variables=None, image="yandex/clickhouse-integration-test", tag=None,
stay_alive=False, ipv4_address=None, ipv6_address=None, with_installed_binary=False, tmpfs=None,
@ -210,6 +211,7 @@ class ClickHouseCluster:
with_kafka=with_kafka,
with_kerberized_kafka=with_kerberized_kafka,
with_rabbitmq=with_rabbitmq,
with_kerberized_hdfs=with_kerberized_hdfs,
with_mongo=with_mongo,
with_redis=with_redis,
with_minio=with_minio,
@ -315,6 +317,14 @@ class ClickHouseCluster:
p.join(docker_compose_yml_dir, 'docker_compose_hdfs.yml')]
cmds.append(self.base_hdfs_cmd)
if with_kerberized_hdfs and not self.with_kerberized_hdfs:
self.with_kerberized_hdfs = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')])
self.base_kerberized_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file',
p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')]
cmds.append(self.base_kerberized_hdfs_cmd)
if with_mongo and not self.with_mongo:
self.with_mongo = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_mongo.yml')])
@ -476,12 +486,35 @@ class ClickHouseCluster:
raise Exception("Cannot wait ZooKeeper container")
def make_hdfs_api(self, timeout=60, kerberized=False):
if kerberized:
keytab = p.abspath(p.join(self.instances['node1'].path, "secrets/clickhouse.keytab"))
krb_conf = p.abspath(p.join(self.instances['node1'].path, "secrets/krb_long.conf"))
hdfs_ip = self.get_instance_ip('kerberizedhdfs1')
# print("kerberizedhdfs1 ip ", hdfs_ip)
kdc_ip = self.get_instance_ip('hdfskerberos')
# print("kdc_ip ", kdc_ip)
self.hdfs_api = HDFSApi(user="root",
timeout=timeout,
kerberized=True,
principal="root@TEST.CLICKHOUSE.TECH",
keytab=keytab,
krb_conf=krb_conf,
host="kerberizedhdfs1",
protocol="http",
proxy_port=50070,
data_port=1006,
hdfs_ip=hdfs_ip,
kdc_ip=kdc_ip)
else:
self.hdfs_api = HDFSApi(user="root", host="hdfs1")
def wait_hdfs_to_start(self, timeout=60):
hdfs_api = HDFSApi("root")
start = time.time()
while time.time() - start < timeout:
try:
hdfs_api.write_data("/somefilewithrandomname222", "1")
self.hdfs_api.write_data("/somefilewithrandomname222", "1")
print("Connected to HDFS and SafeMode disabled! ")
return
except Exception as ex:
@ -620,6 +653,7 @@ class ClickHouseCluster:
self.wait_schema_registry_to_start(120)
if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd:
print('Setup kerberized kafka')
env = os.environ.copy()
env['KERBERIZED_KAFKA_DIR'] = instance.path + '/'
subprocess.check_call(self.base_kerberized_kafka_cmd + common_opts + ['--renew-anon-volumes'], env=env)
@ -631,8 +665,17 @@ class ClickHouseCluster:
if self.with_hdfs and self.base_hdfs_cmd:
print('Setup HDFS')
subprocess_check_call(self.base_hdfs_cmd + common_opts)
self.make_hdfs_api()
self.wait_hdfs_to_start(120)
if self.with_kerberized_hdfs and self.base_kerberized_hdfs_cmd:
print('Setup kerberized HDFS')
env = os.environ.copy()
env['KERBERIZED_HDFS_DIR'] = instance.path + '/'
subprocess.check_call(self.base_kerberized_hdfs_cmd + common_opts, env=env)
self.make_hdfs_api(kerberized=True)
self.wait_hdfs_to_start(timeout=300)
if self.with_mongo and self.base_mongo_cmd:
print('Setup Mongo')
subprocess_check_call(self.base_mongo_cmd + common_opts)
@ -840,8 +883,8 @@ class ClickHouseInstance:
def __init__(
self, cluster, base_path, name, base_config_dir, custom_main_configs, custom_user_configs,
custom_dictionaries,
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_kerberized_kafka, with_rabbitmq, with_mongo,
with_redis, with_minio,
macros, with_zookeeper, zookeeper_config_path, with_mysql, with_kafka, with_kerberized_kafka, with_rabbitmq, with_kerberized_hdfs,
with_mongo, with_redis, with_minio,
with_cassandra, server_bin_path, odbc_bridge_bin_path, clickhouse_path_dir, with_odbc_drivers,
hostname=None, env_variables=None,
image="yandex/clickhouse-integration-test", tag="latest",
@ -871,6 +914,7 @@ class ClickHouseInstance:
self.with_kafka = with_kafka
self.with_kerberized_kafka = with_kerberized_kafka
self.with_rabbitmq = with_rabbitmq
self.with_kerberized_hdfs = with_kerberized_hdfs
self.with_mongo = with_mongo
self.with_redis = with_redis
self.with_minio = with_minio
@ -885,7 +929,7 @@ class ClickHouseInstance:
else:
self.odbc_ini_path = ""
if with_kerberized_kafka:
if with_kerberized_kafka or with_kerberized_hdfs:
self.keytab_path = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets:/tmp/keytab"
self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb.conf:/etc/krb5.conf:ro"
else:
@ -1231,7 +1275,7 @@ class ClickHouseInstance:
if self.with_zookeeper:
shutil.copy(self.zookeeper_config_path, conf_d_dir)
if self.with_kerberized_kafka:
if self.with_kerberized_kafka or self.with_kerberized_hdfs:
shutil.copytree(self.kerberos_secrets_dir, p.abspath(p.join(self.path, 'secrets')))
# Copy config.d configs
@ -1272,6 +1316,9 @@ class ClickHouseInstance:
if self.with_kerberized_kafka:
depends_on.append("kerberized_kafka1")
if self.with_kerberized_hdfs:
depends_on.append("kerberizedhdfs1")
if self.with_rabbitmq:
depends_on.append("rabbitmq1")

View File

@ -2,45 +2,145 @@
import io
import gzip
import subprocess
import time
from tempfile import NamedTemporaryFile
import requests
import requests_kerberos as reqkerb
import socket
import tempfile
import logging
import os
g_dns_hook = None
def custom_getaddrinfo(*args):
# print("from custom_getaddrinfo g_dns_hook is None ", g_dns_hook is None)
ret = g_dns_hook.custom_getaddrinfo(*args)
# print("g_dns_hook.custom_getaddrinfo result", ret)
return ret
class mk_krb_conf(object):
def __init__(self, krb_conf, kdc_ip):
self.krb_conf = krb_conf
self.kdc_ip = kdc_ip
self.amended_krb_conf = None
def __enter__(self):
with open(self.krb_conf) as f:
content = f.read()
amended_content = content.replace('hdfskerberos', self.kdc_ip)
self.amended_krb_conf = tempfile.NamedTemporaryFile(delete=False, mode="w+")
self.amended_krb_conf.write(amended_content)
self.amended_krb_conf.close()
return self.amended_krb_conf.name
def __exit__(self, type, value, traceback):
if self.amended_krb_conf is not None:
self.amended_krb_conf.close()
# tweak dns resolution to connect to localhost where api_host is in URL
class dns_hook(object):
def __init__(self, hdfs_api):
# print("dns_hook.init ", hdfs_api.kerberized, hdfs_api.host, hdfs_api.data_port, hdfs_api.proxy_port)
self.hdfs_api = hdfs_api
def __enter__(self):
global g_dns_hook
g_dns_hook = self
# print("g_dns_hook is None ", g_dns_hook is None)
self.original_getaddrinfo = socket.getaddrinfo
socket.getaddrinfo = custom_getaddrinfo
return self
def __exit__(self, type, value, traceback):
global g_dns_hook
g_dns_hook = None
socket.getaddrinfo = self.original_getaddrinfo
def custom_getaddrinfo(self, *args):
(hostname, port) = args[:2]
# print("top of custom_getaddrinfo", hostname, port)
if hostname == self.hdfs_api.host and (port == self.hdfs_api.data_port or port == self.hdfs_api.proxy_port):
# print("dns_hook substitute")
return [(socket.AF_INET, 1, 6, '', ("127.0.0.1", port))]
else:
return self.original_getaddrinfo(*args)
class HDFSApi(object):
def __init__(self, user):
self.host = "localhost"
self.http_proxy_port = "50070"
self.http_data_port = "50075"
def __init__(self, user, timeout=100, kerberized=False, principal=None,
keytab=None, krb_conf=None,
host = "localhost", protocol = "http",
proxy_port = 50070, data_port = 50075, hdfs_ip = None, kdc_ip = None):
self.host = host
self.protocol = protocol
self.proxy_port = proxy_port
self.data_port = data_port
self.user = user
self.kerberized = kerberized
self.principal = principal
self.keytab = keytab
self.timeout = timeout
self.hdfs_ip = hdfs_ip
self.kdc_ip = kdc_ip
self.krb_conf = krb_conf
# logging.basicConfig(level=logging.DEBUG)
# logging.getLogger().setLevel(logging.DEBUG)
# requests_log = logging.getLogger("requests.packages.urllib3")
# requests_log.setLevel(logging.DEBUG)
# requests_log.propagate = True
if kerberized:
self._run_kinit()
self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal)
#principal=self.principal,
#hostname_override=self.host, principal=self.principal)
# , mutual_authentication=reqkerb.REQUIRED, force_preemptive=True)
else:
self.kerberos_auth = None
def _run_kinit(self):
if self.principal is None or self.keytab is None:
raise Exception("kerberos principal and keytab are required")
with mk_krb_conf(self.krb_conf, self.kdc_ip) as instantiated_krb_conf:
# print("instantiated_krb_conf ", instantiated_krb_conf)
os.environ["KRB5_CONFIG"] = instantiated_krb_conf
cmd = "(kinit -R -t {keytab} -k {principal} || (sleep 5 && kinit -R -t {keytab} -k {principal})) ; klist".format(instantiated_krb_conf=instantiated_krb_conf, keytab=self.keytab, principal=self.principal)
# print(cmd)
start = time.time()
while time.time() - start < self.timeout:
try:
subprocess.call(cmd, shell=True)
print("KDC started, kinit successfully run")
return
except Exception as ex:
print("Can't run kinit ... waiting {}".format(str(ex)))
time.sleep(1)
raise Exception("Kinit running failure")
def read_data(self, path, universal_newlines=True):
response = requests.get(
"http://{host}:{port}/webhdfs/v1{path}?op=OPEN".format(host=self.host, port=self.http_proxy_port,
path=path), allow_redirects=False)
with dns_hook(self):
response = requests.get("{protocol}://{host}:{port}/webhdfs/v1{path}?op=OPEN".format(protocol=self.protocol, host=self.host, port=self.proxy_port, path=path), headers={'host': 'localhost'}, allow_redirects=False, verify=False, auth=self.kerberos_auth)
if response.status_code != 307:
response.raise_for_status()
additional_params = '&'.join(response.headers['Location'].split('&')[1:2])
response_data = requests.get(
"http://{host}:{port}/webhdfs/v1{path}?op=OPEN&{params}".format(host=self.host, port=self.http_data_port,
path=path, params=additional_params))
# additional_params = '&'.join(response.headers['Location'].split('&')[1:2])
url = "{location}".format(location=response.headers['Location'])
# print("redirected to ", url)
with dns_hook(self):
response_data = requests.get(url,
headers={'host': 'localhost'},
verify=False, auth=self.kerberos_auth)
if response_data.status_code != 200:
response_data.raise_for_status()
if universal_newlines:
return response_data.text
else:
return response_data.content
# Requests can't put file
def _curl_to_put(self, filename, path, params):
url = "http://{host}:{port}/webhdfs/v1{path}?op=CREATE&{params}".format(host=self.host,
port=self.http_data_port, path=path,
params=params)
cmd = "curl -s -i -X PUT -T {fname} '{url}'".format(fname=filename, url=url)
output = subprocess.check_output(cmd, shell=True, universal_newlines=True)
return output
def write_data(self, path, content):
named_file = NamedTemporaryFile(mode='wb+')
fpath = named_file.name
@ -48,19 +148,44 @@ class HDFSApi(object):
content = content.encode()
named_file.write(content)
named_file.flush()
response = requests.put(
"http://{host}:{port}/webhdfs/v1{path}?op=CREATE".format(host=self.host, port=self.http_proxy_port,
path=path, user=self.user),
allow_redirects=False
)
if self.kerberized:
self._run_kinit()
self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal)
# print(self.kerberos_auth)
with dns_hook(self):
response = requests.put(
"{protocol}://{host}:{port}/webhdfs/v1{path}?op=CREATE".format(protocol=self.protocol, host=self.host,
port=self.proxy_port,
path=path, user=self.user),
allow_redirects=False,
headers={'host': 'localhost'},
params={'overwrite' : 'true'},
verify=False, auth=self.kerberos_auth
)
if response.status_code != 307:
# print(response.headers)
response.raise_for_status()
additional_params = '&'.join(
response.headers['Location'].split('&')[1:2] + ["user.name={}".format(self.user), "overwrite=true"])
output = self._curl_to_put(fpath, path, additional_params)
if "201 Created" not in output:
raise Exception("Can't create file on hdfs:\n {}".format(output))
with dns_hook(self), open(fpath, mode="rb") as fh:
file_data = fh.read()
protocol = "http" # self.protocol
response = requests.put(
"{location}".format(location=response.headers['Location']),
data=file_data,
headers={'content-type':'text/plain', 'host': 'localhost'},
params={'file': path, 'user.name' : self.user},
allow_redirects=False, verify=False, auth=self.kerberos_auth
)
# print(response)
if response.status_code != 201:
response.raise_for_status()
def write_gzip_data(self, path, content):
if isinstance(content, str):

View File

@ -1,6 +1,5 @@
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.hdfs_api import HDFSApi
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/config_with_hosts.xml'])
@ -101,9 +100,8 @@ def test_table_function_remote(start_cluster):
def test_redirect(start_cluster):
hdfs_api = HDFSApi("root")
hdfs_api.write_data("/simple_storage", "1\t\n")
assert hdfs_api.read_data("/simple_storage") == "1\t\n"
start_cluster.hdfs_api.write_data("/simple_storage", "1\t\n")
assert start_cluster.hdfs_api.read_data("/simple_storage") == "1\t\n"
node7.query(
"CREATE TABLE table_test_7_1 (word String) ENGINE=URL('http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', CSV)")
assert "not allowed" in node7.query_and_get_error("SET max_http_get_redirects=1; SELECT * from table_test_7_1")

View File

@ -17,9 +17,8 @@ def started_cluster():
def test_url_without_redirect(started_cluster):
hdfs_api = HDFSApi("root")
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access datanode port directly
node1.query(
@ -28,9 +27,8 @@ def test_url_without_redirect(started_cluster):
def test_url_with_redirect_not_allowed(started_cluster):
hdfs_api = HDFSApi("root")
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access proxy port without allowing redirects
node1.query(
@ -40,9 +38,8 @@ def test_url_with_redirect_not_allowed(started_cluster):
def test_url_with_redirect_allowed(started_cluster):
hdfs_api = HDFSApi("root")
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access proxy port with allowing redirects
# http://localhost:50070/webhdfs/v1/b?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0

View File

@ -24,18 +24,14 @@ def started_cluster():
def test_read_write_storage(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/simple_storage', 'TSV')")
node1.query("insert into SimpleHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from SimpleHDFSStorage") == "1\tMark\t72.53\n"
def test_read_write_storage_with_globs(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table HDFSStorageWithRange (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage{1..5}', 'TSV')")
node1.query(
@ -46,8 +42,8 @@ def test_read_write_storage_with_globs(started_cluster):
"create table HDFSStorageWithAsterisk (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage*', 'TSV')")
for i in ["1", "2", "3"]:
hdfs_api.write_data("/storage" + i, i + "\tMark\t72.53\n")
assert hdfs_api.read_data("/storage" + i) == i + "\tMark\t72.53\n"
started_cluster.hdfs_api.write_data("/storage" + i, i + "\tMark\t72.53\n")
assert started_cluster.hdfs_api.read_data("/storage" + i) == i + "\tMark\t72.53\n"
assert node1.query("select count(*) from HDFSStorageWithRange") == "3\n"
assert node1.query("select count(*) from HDFSStorageWithEnum") == "3\n"
@ -77,25 +73,23 @@ def test_read_write_storage_with_globs(started_cluster):
def test_read_write_table(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
hdfs_api.write_data("/simple_table_function", data)
started_cluster.hdfs_api.write_data("/simple_table_function", data)
assert hdfs_api.read_data("/simple_table_function") == data
assert started_cluster.hdfs_api.read_data("/simple_table_function") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')") == data
def test_write_table(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table OtherHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/other_storage', 'TSV')")
node1.query("insert into OtherHDFSStorage values (10, 'tomas', 55.55), (11, 'jack', 32.54)")
result = "10\ttomas\t55.55\n11\tjack\t32.54\n"
assert hdfs_api.read_data("/other_storage") == result
assert started_cluster.hdfs_api.read_data("/other_storage") == result
assert node1.query("select * from OtherHDFSStorage order by id") == result
@ -120,15 +114,14 @@ def test_bad_hdfs_uri(started_cluster):
print(ex)
assert "Unable to open HDFS file" in str(ex)
@pytest.mark.timeout(800)
def test_globs_in_read_table(started_cluster):
hdfs_api = HDFSApi("root")
some_data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
globs_dir = "/dir_for_test_with_globs/"
files = ["dir1/dir_dir/file1", "dir2/file2", "simple_table_function", "dir/file", "some_dir/dir1/file",
"some_dir/dir2/file", "some_dir/file", "table1_function", "table2_function", "table3_function"]
for filename in files:
hdfs_api.write_data(globs_dir + filename, some_data)
started_cluster.hdfs_api.write_data(globs_dir + filename, some_data)
test_requests = [("dir{1..5}/dir_dir/file1", 1, 1),
("*_table_functio?", 1, 1),
@ -145,6 +138,7 @@ def test_globs_in_read_table(started_cluster):
for pattern, paths_amount, files_amount in test_requests:
inside_table_func = "'hdfs://hdfs1:9000" + globs_dir + pattern + "', 'TSV', 'id UInt64, text String, number Float64'"
print("inside_table_func ", inside_table_func)
assert node1.query("select * from hdfs(" + inside_table_func + ")") == paths_amount * some_data
assert node1.query("select count(distinct _path) from hdfs(" + inside_table_func + ")").rstrip() == str(
paths_amount)
@ -153,64 +147,61 @@ def test_globs_in_read_table(started_cluster):
def test_read_write_gzip_table(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function.gz", data)
started_cluster.hdfs_api.write_gzip_data("/simple_table_function.gz", data)
assert hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert started_cluster.hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64')") == data
def test_read_write_gzip_table_with_parameter_gzip(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function", data)
started_cluster.hdfs_api.write_gzip_data("/simple_table_function", data)
assert hdfs_api.read_gzip_data("/simple_table_function") == data
assert started_cluster.hdfs_api.read_gzip_data("/simple_table_function") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64', 'gzip')") == data
def test_read_write_table_with_parameter_none(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_data("/simple_table_function.gz", data)
started_cluster.hdfs_api.write_data("/simple_table_function.gz", data)
assert hdfs_api.read_data("/simple_table_function.gz") == data
assert started_cluster.hdfs_api.read_data("/simple_table_function.gz") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64', 'none')") == data
def test_read_write_gzip_table_with_parameter_auto_gz(started_cluster):
hdfs_api = HDFSApi("root")
data = "1\tHello Jessica\t555.222\n2\tI rolled a joint\t777.333\n"
hdfs_api.write_gzip_data("/simple_table_function.gz", data)
started_cluster.hdfs_api.write_gzip_data("/simple_table_function.gz", data)
assert hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert started_cluster.hdfs_api.read_gzip_data("/simple_table_function.gz") == data
assert node1.query(
"select * from hdfs('hdfs://hdfs1:9000/simple_table_function.gz', 'TSV', 'id UInt64, text String, number Float64', 'auto')") == data
def test_write_gz_storage(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table GZHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/storage.gz', 'TSV')")
node1.query("insert into GZHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_gzip_data("/storage.gz") == "1\tMark\t72.53\n"
assert started_cluster.hdfs_api.read_gzip_data("/storage.gz") == "1\tMark\t72.53\n"
assert node1.query("select * from GZHDFSStorage") == "1\tMark\t72.53\n"
def test_write_gzip_storage(started_cluster):
hdfs_api = HDFSApi("root")
node1.query(
"create table GZIPHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://hdfs1:9000/gzip_storage', 'TSV', 'gzip')")
node1.query("insert into GZIPHDFSStorage values (1, 'Mark', 72.53)")
assert hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n"
assert started_cluster.hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from GZIPHDFSStorage") == "1\tMark\t72.53\n"
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()

View File

@ -0,0 +1,13 @@
<yandex>
<hdfs>
<hadoop_kerberos_keytab>/tmp/keytab/clickhouse.keytab</hadoop_kerberos_keytab>
<hadoop_kerberos_principal>root@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
<hadoop_security_authentication>kerberos</hadoop_security_authentication>
</hdfs>
<hdfs_suser>
<hadoop_kerberos_principal>specuser@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
</hdfs_suser>
<hdfs_dedicatedcachepath>
<hadoop_security_kerberos_ticket_cache_path>/tmp/kerb_cache</hadoop_security_kerberos_ticket_cache_path>
</hdfs_dedicatedcachepath>
</yandex>

View File

@ -0,0 +1,11 @@
<yandex>
<logger>
<level>trace</level>
<log>/var/log/clickhouse-server/log.log</log>
<errorlog>/var/log/clickhouse-server/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/stdout.log</stdout>
</logger>
</yandex>

View File

@ -0,0 +1,262 @@
#!/bin/bash
: "${HADOOP_PREFIX:=/usr/local/hadoop}"
cat >> $HADOOP_PREFIX/etc/hadoop/hadoop-env.sh <<EOF
export HADOOP_SECURE_DN_USER=hdfs
export HADOOP_SECURE_DN_PID_DIR=$HADOOP_PREFIX/pid
export HADOOP_SECURE_DN_LOG_DIR=$HADOOP_PREFIX/logs/hdfs
export JSVC_HOME=$HADOOP_PREFIX/sbin
EOF
$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
mkdir -p "${HADOOP_SECURE_DN_PID_DIR}"
mkdir -p "${HADOOP_SECURE_DN_LOG_DIR}"
rm /tmp/*.pid
# installing libraries if any - (resource urls added comma separated to the ACP system variable)
cd "${HADOOP_PREFIX}/share/hadoop/common" || exit
for cp in ${ACP//,/ }; do echo "== ${cp}"; curl -LO "${cp}" ; done;
cd - || exit
# altering the core-site configuration
sed "s/HOSTNAME/${HOSTNAME}/" /usr/local/hadoop/etc/hadoop/core-site.xml.template | grep -v '/configuration' > /usr/local/hadoop/etc/hadoop/core-site.xml
cat >> /usr/local/hadoop/etc/hadoop/core-site.xml << EOF
<property>
<name>hadoop.security.authentication</name>
<value>kerberos</value> <!-- A value of "simple" would disable security. -->
</property>
<property>
<name>hadoop.security.authorization</name>
<value>true</value>
</property>
<property>
<name>fs.defaultFS</name>
<value>hdfs://kerberizedhdfs1:9000</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://kerberizedhdfs1:9000</value>
</property>
<!--
<property>
<name>hadoop.rpc.protection</name>
<value>privacy</value>
</property>
-->
</configuration>
EOF
cat > /usr/local/hadoop/etc/hadoop/hdfs-site.xml << EOF
<configuration>
<!--
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
-->
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<!-- General HDFS security config -->
<property>
<name>dfs.block.access.token.enable</name>
<value>true</value>
</property>
<!-- NameNode security config -->
<property>
<name>dfs.namenode.keytab.file</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.keytab</value> <!-- path to the HDFS keytab -->
</property>
<property>
<name>dfs.namenode.kerberos.principal</name>
<value>hdfs/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<property>
<name>dfs.namenode.kerberos.internal.spnego.principal</name>
<value>HTTP/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<!-- Secondary NameNode security config -->
<property>
<name>dfs.secondary.namenode.keytab.file</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.keytab</value> <!-- path to the HDFS keytab -->
</property>
<property>
<name>dfs.secondary.namenode.kerberos.principal</name>
<value>hdfs/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<property>
<name>dfs.secondary.namenode.kerberos.internal.spnego.principal</name>
<value>HTTP/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<!-- DataNode security config
<property>
<name>dfs.data.transfer.protectionл</name>
<value>integrity</value>
</property>
-->
<property>
<name>dfs.datanode.data.dir.perm</name>
<value>700</value>
</property>
<property>
<name>dfs.datanode.address</name>
<value>0.0.0.0:1004</value>
</property>
<property>
<name>dfs.datanode.http.address</name>
<value>0.0.0.0:1006</value>
</property>
<!--
<property>
<name>dfs.http.policy</name>
<value>HTTPS_ONLY</value>
</property>
-->
<property>
<name>dfs.datanode.keytab.file</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.keytab</value> <!-- path to the HDFS keytab -->
</property>
<property>
<name>dfs.datanode.kerberos.principal</name>
<value>hdfs/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<!-- Web Authentication config -->
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.encrypt.data.transfer</name>
<value>false</value>
</property>
<property>
<name>dfs.web.authentication.kerberos.principal</name>
<value>HTTP/_HOST@TEST.CLICKHOUSE.TECH</value>
</property>
<property>
<name>dfs.web.authentication.kerberos.keytab</name>
<value>/usr/local/hadoop/etc/hadoop/conf/hdfs.keytab</value> <!-- path to the HDFS keytab -->
</property>
</configuration>
EOF
# cat > /usr/local/hadoop/etc/hadoop/ssl-server.xml << EOF
# <configuration>
# <property>
# <name>ssl.server.truststore.location</name>
# <value>/usr/local/hadoop/etc/hadoop/conf/hdfs.jks</value>
# </property>
# <property>
# <name>ssl.server.truststore.password</name>
# <value>masterkey</value>
# </property>
# <property>
# <name>ssl.server.keystore.location</name>
# <value>/usr/local/hadoop/etc/hadoop/conf/hdfs.jks</value>
# </property>
# <property>
# <name>ssl.server.keystore.password</name>
# <value>masterkey</value>
# </property>
# <property>
# <name>ssl.server.keystore.keypassword</name>
# <value>masterkey</value>
# </property>
# </configuration>
# EOF
cat > /usr/local/hadoop/etc/hadoop/log4j.properties << EOF
# Set everything to be logged to the console
log4j.rootCategory=DEBUG, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to WARN. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=INFO
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=DEBUG
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
log4j.logger.org.apache.spark.deploy=DEBUG
log4j.logger.org.apache.spark.executor=DEBUG
log4j.logger.org.apache.spark.scheduler=DEBUG
EOF
useradd -u 1098 hdfs
# keytool -genkey -alias kerberized_hdfs1.test.clickhouse.tech -keyalg rsa -keysize 1024 -dname "CN=kerberized_hdfs1.test.clickhouse.tech" -keypass masterkey -keystore /usr/local/hadoop/etc/hadoop/conf/hdfs.jks -storepass masterkey
keytool -genkey -alias kerberizedhdfs1 -keyalg rsa -keysize 1024 -dname "CN=kerberizedhdfs1" -keypass masterkey -keystore /usr/local/hadoop/etc/hadoop/conf/hdfs.jks -storepass masterkey
chmod g+r /usr/local/hadoop/etc/hadoop/conf/hdfs.jks
service sshd start
# yum --quiet --assumeyes install krb5-workstation.x86_64
# yum --quiet --assumeyes install tcpdump
# cd /tmp
# curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz
# tar xzf commons-daemon-1.0.15-src.tar.gz
# cd commons-daemon-1.0.15-src/src/native/unix
# ./configure && make
# cp ./jsvc /usr/local/hadoop/sbin
until kinit -kt /usr/local/hadoop/etc/hadoop/conf/hdfs.keytab hdfs/kerberizedhdfs1@TEST.CLICKHOUSE.TECH; do sleep 2; done
echo "KDC is up and ready to go... starting up"
$HADOOP_PREFIX/sbin/start-dfs.sh
$HADOOP_PREFIX/sbin/start-yarn.sh
$HADOOP_PREFIX/sbin/mr-jobhistory-daemon.sh start historyserver
chmod a+r /usr/local/hadoop/etc/hadoop/conf/hdfs.keytab # create dedicated keytab for hdfsuser
$HADOOP_PREFIX/sbin/start-secure-dns.sh
sleep 3
/usr/local/hadoop/bin/hdfs dfsadmin -safemode leave
/usr/local/hadoop/bin/hdfs dfs -mkdir /user/specuser
/usr/local/hadoop/bin/hdfs dfs -chown specuser /user/specuser
kdestroy
# adduser --groups hdfs hdfsuser
# /usr/local/hadoop/sbin/hadoop-daemon.sh --config /usr/local/hadoop/etc/hadoop/ --script /usr/local/hadoop/sbin/hdfs start namenode
# /usr/local/hadoop/sbin/hadoop-daemon.sh --config /usr/local/hadoop/etc/hadoop/ --script /usr/local/hadoop/sbin/hdfs start datanode
if [[ $1 == "-d" ]]; then
while true; do sleep 1000; done
fi
if [[ $1 == "-bash" ]]; then
/bin/bash
fi

View File

@ -0,0 +1,139 @@
#!/bin/bash
set -x # trace
: "${REALM:=TEST.CLICKHOUSE.TECH}"
: "${DOMAIN_REALM:=test.clickhouse.tech}"
: "${KERB_MASTER_KEY:=masterkey}"
: "${KERB_ADMIN_USER:=admin}"
: "${KERB_ADMIN_PASS:=admin}"
create_config() {
: "${KDC_ADDRESS:=$(hostname -f)}"
cat>/etc/krb5.conf<<EOF
[logging]
default = FILE:/var/log/kerberos/krb5libs.log
kdc = FILE:/var/log/kerberos/krb5kdc.log
admin_server = FILE:/var/log/kerberos/kadmind.log
[libdefaults]
default_realm = $REALM
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 15d
renew_lifetime = 15d
forwardable = true
# WARNING: We use weaker key types to simplify testing as stronger key types
# require the enhanced security JCE policy file to be installed. You should
# NOT run with this configuration in production or any real environment. You
# have been warned.
default_tkt_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
default_tgs_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
permitted_enctypes = des-cbc-md5 des-cbc-crc des3-cbc-sha1
[realms]
$REALM = {
kdc = $KDC_ADDRESS
admin_server = $KDC_ADDRESS
}
[domain_realm]
.$DOMAIN_REALM = $REALM
$DOMAIN_REALM = $REALM
EOF
cat>/var/kerberos/krb5kdc/kdc.conf<<EOF
[kdcdefaults]
kdc_ports = 88
kdc_tcp_ports = 88
[realms]
$REALM = {
acl_file = /var/kerberos/krb5kdc/kadm5.acl
dict_file = /usr/share/dict/words
admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab
# WARNING: We use weaker key types to simplify testing as stronger key types
# require the enhanced security JCE policy file to be installed. You should
# NOT run with this configuration in production or any real environment. You
# have been warned.
master_key_type = des3-hmac-sha1
supported_enctypes = arcfour-hmac:normal des3-hmac-sha1:normal des-cbc-crc:normal des:normal des:v4 des:norealm des:onlyrealm des:afs3
default_principal_flags = +preauth
}
EOF
}
create_db() {
/usr/sbin/kdb5_util -P $KERB_MASTER_KEY -r $REALM create -s
}
start_kdc() {
mkdir -p /var/log/kerberos
/etc/rc.d/init.d/krb5kdc start
/etc/rc.d/init.d/kadmin start
chkconfig krb5kdc on
chkconfig kadmin on
}
restart_kdc() {
/etc/rc.d/init.d/krb5kdc restart
/etc/rc.d/init.d/kadmin restart
}
create_admin_user() {
kadmin.local -q "addprinc -pw $KERB_ADMIN_PASS $KERB_ADMIN_USER/admin"
echo "*/admin@$REALM *" > /var/kerberos/krb5kdc/kadm5.acl
}
create_keytabs() {
# kadmin.local -q "addprinc -randkey hdfs/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
# kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab hdfs/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
# kadmin.local -q "addprinc -randkey HTTP/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
# kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab HTTP/kerberizedhdfs1.${DOMAIN_REALM}@${REALM}"
kadmin.local -q "addprinc -randkey hdfs/kerberizedhdfs1@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab hdfs/kerberizedhdfs1@${REALM}"
kadmin.local -q "addprinc -randkey HTTP/kerberizedhdfs1@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/hdfs.keytab HTTP/kerberizedhdfs1@${REALM}"
kadmin.local -q "addprinc -randkey hdfsuser/node1@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab hdfsuser/node1@${REALM}"
kadmin.local -q "addprinc -randkey hdfsuser@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab hdfsuser@${REALM}"
kadmin.local -q "addprinc -randkey root@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab root@${REALM}"
kadmin.local -q "addprinc -randkey specuser@${REALM}"
kadmin.local -q "ktadd -norandkey -k /tmp/keytab/clickhouse.keytab specuser@${REALM}"
chmod g+r /tmp/keytab/clickhouse.keytab
}
main() {
if [ ! -f /kerberos_initialized ]; then
create_config
create_db
create_admin_user
start_kdc
touch /kerberos_initialized
fi
if [ ! -f /var/kerberos/krb5kdc/principal ]; then
while true; do sleep 1000; done
else
start_kdc
create_keytabs
tail -F /var/log/kerberos/krb5kdc.log
fi
}
[[ "$0" == "${BASH_SOURCE[0]}" ]] && main "$@"

View File

@ -0,0 +1,24 @@
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = TEST.CLICKHOUSE.TECH
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 15s
forwardable = true
default_tgs_enctypes = des3-hmac-sha1
default_tkt_enctypes = des3-hmac-sha1
permitted_enctypes = des3-hmac-sha1
[realms]
TEST.CLICKHOUSE.TECH = {
kdc = hdfskerberos
admin_server = hdfskerberos
}
[domain_realm]
.test.clickhouse.tech = TEST.CLICKHOUSE.TECH
test.clickhouse.tech = TEST.CLICKHOUSE.TECH

View File

@ -0,0 +1,24 @@
[logging]
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log
[libdefaults]
default_realm = TEST.CLICKHOUSE.TECH
dns_lookup_realm = false
dns_lookup_kdc = false
ticket_lifetime = 15d
forwardable = true
default_tgs_enctypes = des3-hmac-sha1
default_tkt_enctypes = des3-hmac-sha1
permitted_enctypes = des3-hmac-sha1
[realms]
TEST.CLICKHOUSE.TECH = {
kdc = hdfskerberos
admin_server = hdfskerberos
}
[domain_realm]
.test.clickhouse.tech = TEST.CLICKHOUSE.TECH
test.clickhouse.tech = TEST.CLICKHOUSE.TECH

View File

@ -0,0 +1,108 @@
import time
import pytest
import os
from helpers.cluster import ClickHouseCluster
import subprocess
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_kerberized_hdfs=True, user_configs=[], main_configs=['configs/log_conf.xml', 'configs/hdfs.xml'])
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
yield cluster
except Exception as ex:
print(ex)
raise ex
finally:
cluster.shutdown()
def test_read_table(started_cluster):
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
started_cluster.hdfs_api.write_data("/simple_table_function", data)
api_read = started_cluster.hdfs_api.read_data("/simple_table_function")
assert api_read == data
select_read = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')")
assert select_read == data
def test_read_write_storage(started_cluster):
node1.query("create table SimpleHDFSStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage1', 'TSV')")
node1.query("insert into SimpleHDFSStorage2 values (1, 'Mark', 72.53)")
api_read = started_cluster.hdfs_api.read_data("/simple_storage1")
assert api_read == "1\tMark\t72.53\n"
select_read = node1.query("select * from SimpleHDFSStorage2")
assert select_read == "1\tMark\t72.53\n"
def test_write_storage_not_expired(started_cluster):
node1.query("create table SimpleHDFSStorageNotExpired (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage_not_expired', 'TSV')")
time.sleep(45) # wait for ticket expiration
node1.query("insert into SimpleHDFSStorageNotExpired values (1, 'Mark', 72.53)")
api_read = started_cluster.hdfs_api.read_data("/simple_storage_not_expired")
assert api_read == "1\tMark\t72.53\n"
select_read = node1.query("select * from SimpleHDFSStorageNotExpired")
assert select_read == "1\tMark\t72.53\n"
def test_two_users(started_cluster):
node1.query("create table HDFSStorOne (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/storage_user_one', 'TSV')")
node1.query("insert into HDFSStorOne values (1, 'Real', 86.00)")
node1.query("create table HDFSStorTwo (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV')")
node1.query("insert into HDFSStorTwo values (1, 'Ideal', 74.00)")
select_read_1 = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV', 'id UInt64, text String, number Float64')")
select_read_2 = node1.query("select * from hdfs('hdfs://suser@kerberizedhdfs1:9000/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')")
def test_read_table_expired(started_cluster):
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
started_cluster.hdfs_api.write_data("/simple_table_function_relogin", data)
started_cluster.pause_container('hdfskerberos')
time.sleep(45)
try:
select_read = node1.query("select * from hdfs('hdfs://reloginuser&kerberizedhdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "DB::Exception: kinit failure:" in str(ex)
started_cluster.unpause_container('hdfskerberos')
def test_prohibited(started_cluster):
node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9000/storage_user_two_prohibited', 'TSV')")
try:
node1.query("insert into HDFSStorTwoProhibited values (1, 'SomeOne', 74.00)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "Unable to open HDFS file: /storage_user_two_prohibited error: Permission denied: user=specuser, access=WRITE" in str(ex)
def test_cache_path(started_cluster):
node1.query("create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9000/storage_dedicated_cache_path', 'TSV')")
try:
node1.query("insert into HDFSStorCachePath values (1, 'FatMark', 92.53)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "DB::Exception: hadoop.security.kerberos.ticket.cache.path cannot be set per user" in str(ex)
if __name__ == '__main__':
cluster.start()
input("Cluster created, press any key to destroy...")
cluster.shutdown()