cleanup, fixes, new submodules, ShellCommand, WriteBufferFromString

This commit is contained in:
Ilya Golshtein 2020-10-30 22:40:16 +03:00
parent d1d657335b
commit 7e97814859
27 changed files with 179 additions and 385 deletions

2
contrib/libgsasl vendored

@ -1 +1 @@
Subproject commit 140fb58250588c8323285b75fcf127c4adc33dfa Subproject commit 383ee28e82f69fa16ed43b48bd9c8ee5b313ab84

2
contrib/libhdfs3 vendored

@ -1 +1 @@
Subproject commit 30552ac527f2c14070d834e171493b2e7f662375 Subproject commit 095b9d48b400abb72d967cb0539af13b1e3d90cf

View File

@ -33,6 +33,11 @@ set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake" ${CMAKE_MODULE_PATH})
include(Platform) include(Platform)
include(Options) include(Options)
# # prefer shared libraries
# if (WITH_KERBEROS)
# find_package(KERBEROS REQUIRED)
# endif()
# source # source
set(PROTO_FILES set(PROTO_FILES
#${HDFS3_SOURCE_DIR}/proto/encryption.proto #${HDFS3_SOURCE_DIR}/proto/encryption.proto
@ -207,14 +212,11 @@ target_include_directories(hdfs3 PRIVATE ${HDFS3_COMMON_DIR})
target_include_directories(hdfs3 PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) target_include_directories(hdfs3 PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
target_include_directories(hdfs3 PRIVATE ${LIBGSASL_INCLUDE_DIR}) target_include_directories(hdfs3 PRIVATE ${LIBGSASL_INCLUDE_DIR})
# if (WITH_KERBEROS)
# target_include_directories(hdfs3 PRIVATE ${KERBEROS_INCLUDE_DIRS})
# endif()
target_include_directories(hdfs3 PRIVATE ${LIBXML2_INCLUDE_DIR}) target_include_directories(hdfs3 PRIVATE ${LIBXML2_INCLUDE_DIR})
target_link_libraries(hdfs3 PRIVATE ${LIBGSASL_LIBRARY}) target_link_libraries(hdfs3 PRIVATE ${LIBGSASL_LIBRARY})
if (WITH_KERBEROS) if (WITH_KERBEROS)
target_link_libraries(hdfs3 PUBLIC ${KRB5_LIBRARY}) target_link_libraries(hdfs3 PRIVATE ${KRB5_LIBRARY})
endif() endif()
target_link_libraries(hdfs3 PRIVATE ${LIBXML2_LIBRARIES}) target_link_libraries(hdfs3 PRIVATE ${LIBXML2_LIBRARIES})

View File

@ -0,0 +1,10 @@
# docker build -t ilejn/kerberized-hadoop .
FROM sequenceiq/hadoop-docker:2.7.0
RUN yum --quiet --assumeyes install krb5-workstation.x86_64
RUN cd /tmp && \
curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz && \
tar xzf commons-daemon-1.0.15-src.tar.gz && \
cd commons-daemon-1.0.15-src/src/native/unix && \
./configure && \
make && \
cp ./jsvc /usr/local/hadoop/sbin

View File

@ -29,6 +29,8 @@ RUN apt-get update \
libcurl4-openssl-dev \ libcurl4-openssl-dev \
gdb \ gdb \
software-properties-common \ software-properties-common \
libkrb5-dev \
krb5-user \
&& rm -rf \ && rm -rf \
/var/lib/apt/lists/* \ /var/lib/apt/lists/* \
/var/cache/debconf \ /var/cache/debconf \
@ -75,7 +77,8 @@ RUN python3 -m pip install \
pytest-timeout \ pytest-timeout \
redis \ redis \
tzlocal \ tzlocal \
urllib3 urllib3 \
requests-kerberos
COPY modprobe.sh /usr/local/bin/modprobe COPY modprobe.sh /usr/local/bin/modprobe
COPY dockerd-entrypoint.sh /usr/local/bin/ COPY dockerd-entrypoint.sh /usr/local/bin/

View File

@ -3,14 +3,14 @@ version: '2.3'
services: services:
kerberizedhdfs1: kerberizedhdfs1:
cap_add: cap_add:
- CAP_DAC_READ_SEARCH - DAC_READ_SEARCH
image: ilejn/kerberized-hadoop:latest image: ilejn/kerberized-hadoop:latest
hostname: kerberizedhdfs1 hostname: kerberizedhdfs1
restart: always restart: always
volumes: volumes:
- ${KERBERIZED_HDFS_DIR}/../../hdfs_configs/bootstrap.sh:/etc/bootstrap.sh:ro - ${KERBERIZED_HDFS_DIR}/../../hdfs_configs/bootstrap.sh:/etc/bootstrap.sh:ro
- ${KERBERIZED_HDFS_DIR}/secrets:/usr/local/hadoop/etc/hadoop/conf - ${KERBERIZED_HDFS_DIR}/secrets:/usr/local/hadoop/etc/hadoop/conf
- ${KERBERIZED_HDFS_DIR}/secrets/krb.conf:/etc/krb5.conf:ro - ${KERBERIZED_HDFS_DIR}/secrets/krb_long.conf:/etc/krb5.conf:ro
ports: ports:
- 1006:1006 - 1006:1006
- 50070:50070 - 50070:50070
@ -20,7 +20,7 @@ services:
entrypoint: /etc/bootstrap.sh -d entrypoint: /etc/bootstrap.sh -d
hdfskerberos: hdfskerberos:
image: yandex/clickhouse-kerberos-kdc:latest image: yandex/clickhouse-kerberos-kdc:${DOCKER_KERBEROS_KDC_TAG}
hostname: hdfskerberos hostname: hdfskerberos
volumes: volumes:
- ${KERBERIZED_HDFS_DIR}/secrets:/tmp/keytab - ${KERBERIZED_HDFS_DIR}/secrets:/tmp/keytab

View File

@ -183,12 +183,17 @@ hadoop\_kerberos\_keytab
hadoop\_kerberos\_principal hadoop\_kerberos\_principal
hadoop\_kerberos\_kinit\_command hadoop\_kerberos\_kinit\_command
#### Limitations {#limitations}
* hadoop\_security\_kerberos\_ticket\_cache\_path can be global only, not user specific
## Kerberos support {#kerberos-support} ## Kerberos support {#kerberos-support}
If hadoop\_security\_authentication parameter has value 'kerberos', ClickHouse authentifies via Kerberos facility. If hadoop\_security\_authentication parameter has value 'kerberos', ClickHouse authentifies via Kerberos facility.
Parameters [here](#clickhouse-extras) and hadoop\_security\_kerberos\_ticket\_cache\_path may be of help. Parameters [here](#clickhouse-extras) and hadoop\_security\_kerberos\_ticket\_cache\_path may be of help.
Note that due to libhdfs3 limitations only old-fashioned approach is supported, Note that due to libhdfs3 limitations only old-fashioned approach is supported,
datanode communications are not secured by SASL. Use tests/integration/test\_storage\_kerberized\_hdfs/hdfs_configs/bootstrap.sh for reference. datanode communications are not secured by SASL (HADOOP\_SECURE\_DN\_USER is a reliable indicator of such
security approach). Use tests/integration/test\_storage\_kerberized\_hdfs/hdfs_configs/bootstrap.sh for reference.
## Virtual Columns {#virtual-columns} ## Virtual Columns {#virtual-columns}
@ -199,5 +204,4 @@ datanode communications are not secured by SASL. Use tests/integration/test\_sto
- [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns) - [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns)
[Original article](https://clickhouse.tech/docs/en/operations/table_engines/hdfs/) <!--hide--> [Original article](https://clickhouse.tech/docs/en/operations/table_engines/hdfs/) <!--hide-->

View File

@ -887,31 +887,6 @@
</http_handlers> </http_handlers>
--> -->
Uncomment to disable ClickHouse internal DNS caching. <!-- Uncomment to disable ClickHouse internal DNS caching. -->
<disable_internal_dns_cache>1</disable_internal_dns_cache> <!-- <disable_internal_dns_cache>1</disable_internal_dns_cache> -->
<!-- <kafka> -->
<!-- <auto_offset_reset>earliest</auto_offset_reset> -->
<!-- <!-\- Debugging of possible issues, like: -->
<!-- - https://github.com/edenhill/librdkafka/issues/2077 -->
<!-- - https://github.com/edenhill/librdkafka/issues/1778 -->
<!-- - #5615 -->
<!-- XXX: for now this messages will appears in stderr. -->
<!-- -\-> -->
<!-- <security_protocol>SASL_PLAINTEXT</security_protocol> -->
<!-- <sasl_mechanism>GSSAPI</sasl_mechanism> -->
<!-- <sasl_kerberos_service_name>kafka</sasl_kerberos_service_name> -->
<!-- <sasl_kerberos_keytab>/tmp/keytab/clickhouse.keytab</sasl_kerberos_keytab> -->
<!-- <!-\- <sasl_kerberos_principal>kafkauser/kerberized_kafka1@TEST.CONFLUENT.IO"</sasl_kerberos_principal> -\-> -->
<!-- <sasl_kerberos_principal>kafkauser@TEST.CONFLUENT.IO"</sasl_kerberos_principal> -->
<!-- <debug>security</debug> -->
<!-- <api_version_request>false</api_version_request> -->
<!-- </kafka> -->
<!-- <kafka_consumer_hang> -->
<!-- <!-\- default: 3000 -\-> -->
<!-- <heartbeat_interval_ms>300</heartbeat_interval_ms> -->
<!-- <!-\- default: 10000 -\-> -->
<!-- <session_timeout_ms>6000</session_timeout_ms> -->
<!-- </kafka_consumer_hang> -->
</yandex> </yandex>

View File

@ -88,6 +88,10 @@ if (USE_AWS_S3)
add_headers_and_sources(dbms Disks/S3) add_headers_and_sources(dbms Disks/S3)
endif() endif()
if (USE_HDFS)
add_headers_and_sources(dbms Storages/HDFS)
endif()
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD}) list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON}) list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
@ -389,8 +393,8 @@ if (USE_GRPC)
endif() endif()
if (USE_HDFS) if (USE_HDFS)
target_link_libraries (clickhouse_common_io PUBLIC ${HDFS3_LIBRARY}) dbms_target_link_libraries(PRIVATE ${HDFS3_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR}) dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR})
endif() endif()
if (USE_AWS_S3) if (USE_AWS_S3)

View File

@ -148,11 +148,6 @@ using BackgroundSchedulePoolTaskInfoPtr = std::shared_ptr<BackgroundSchedulePool
class BackgroundSchedulePoolTaskHolder class BackgroundSchedulePoolTaskHolder
{ {
public: public:
using CleanupFunc = std::function<void()>;
CleanupFunc cleanup_func;
BackgroundSchedulePoolTaskHolder() = default; BackgroundSchedulePoolTaskHolder() = default;
explicit BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskInfoPtr & task_info_) : task_info(task_info_) {} explicit BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskInfoPtr & task_info_) : task_info(task_info_) {}
BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskHolder & other) = delete; BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskHolder & other) = delete;
@ -164,8 +159,6 @@ public:
{ {
if (task_info) if (task_info)
task_info->deactivate(); task_info->deactivate();
if (cleanup_func)
cleanup_func();
} }
operator bool() const { return task_info != nullptr; } operator bool() const { return task_info != nullptr; }
@ -173,8 +166,6 @@ public:
BackgroundSchedulePoolTaskInfo * operator->() { return task_info.get(); } BackgroundSchedulePoolTaskInfo * operator->() { return task_info.get(); }
const BackgroundSchedulePoolTaskInfo * operator->() const { return task_info.get(); } const BackgroundSchedulePoolTaskInfo * operator->() const { return task_info.get(); }
void setCleanupFunc(const CleanupFunc function) {cleanup_func = function;}
private: private:
BackgroundSchedulePoolTaskInfoPtr task_info; BackgroundSchedulePoolTaskInfoPtr task_info;
}; };

View File

@ -1,12 +1,12 @@
#include <IO/HDFSCommon.h> #include <Storages/HDFS/HDFSCommon.h>
#include <Poco/URI.h> #include <Poco/URI.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <boost/algorithm/string/replace.hpp> #include <boost/algorithm/string/replace.hpp>
#include <Interpreters/Context.h>
#if USE_HDFS #if USE_HDFS
#include <Common/ShellCommand.h>
#include <Common/Exception.h> #include <Common/Exception.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
namespace DB namespace DB
{ {
@ -19,25 +19,9 @@ extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs"; const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs";
// void HDFSBuilderWrapper::makeCachePath(const String & cachePath, String user)
// {
// if (hadoop_security_kerberos_ticket_cache_path.empty())
// {
// hadoop_security_kerberos_ticket_cache_path = cachePath + "KRB5CACHEPATH" + user;
// hdfsBuilderSetKerbTicketCachePath(hdfs_builder, hadoop_security_kerberos_ticket_cache_path.c_str());
// }
// }
void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration & config, void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration & config,
const String & config_path, bool isUser) const String & config_path, bool isUser)
{ {
hdfsBuilderConfSetStr(hdfs_builder, "input.read.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(hdfs_builder, "input.write.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(hdfs_builder, "input.connect.timeout", "60000"); // 1 min
// hdfsBuilderConfSetStr(rawBuilder, "hadoop.security.authentication", "kerberos");
// hdfsBuilderConfSetStr(rawBuilder, "dfs.client.log.severity", "TRACE");
Poco::Util::AbstractConfiguration::Keys keys; Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_path, keys); config.keys(config_path, keys);
@ -85,20 +69,17 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
const auto & [k,v] = keep(key_name, config.getString(key_path)); const auto & [k,v] = keep(key_name, config.getString(key_path));
hdfsBuilderConfSetStr(hdfs_builder, k.c_str(), v.c_str()); hdfsBuilderConfSetStr(hdfs_builder, k.c_str(), v.c_str());
} }
} }
String HDFSBuilderWrapper::getKinitCmd() String HDFSBuilderWrapper::getKinitCmd()
{ {
std::stringstream ss; WriteBufferFromOwnString ss;
<<<<<<< HEAD
String cache_name = hadoop_security_kerberos_ticket_cache_path.empty() ? String cache_name = hadoop_security_kerberos_ticket_cache_path.empty() ?
String() : String() :
(String(" -c \"") + hadoop_security_kerberos_ticket_cache_path + "\""); (String(" -c \"") + hadoop_security_kerberos_ticket_cache_path + "\"");
ss << hadoop_kerberos_kinit_command << cache_name << ss << hadoop_kerberos_kinit_command << cache_name <<
" -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal << " -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal <<
"|| " << hadoop_kerberos_kinit_command << cache_name << " -t \"" << "|| " << hadoop_kerberos_kinit_command << cache_name << " -t \"" <<
@ -113,21 +94,14 @@ void HDFSBuilderWrapper::runKinit()
std::unique_lock<std::mutex> lck(kinit_mtx); std::unique_lock<std::mutex> lck(kinit_mtx);
int ret = system(cmd.c_str()); auto command = ShellCommand::execute(cmd);
if (ret) auto status = command->tryWait();
{ // check it works !! if (status)
throw Exception("kinit failure: " + std::to_string(ret) + " " + cmd, ErrorCodes::NETWORK_ERROR); {
throw Exception("kinit failure: " + cmd, ErrorCodes::BAD_ARGUMENTS);
} }
} }
=======
ss << "kinit -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal <<
"|| kinit -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal;
return ss.str();
}
>>>>>>> kerberized hdfs compiled
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & context) HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & context)
{ {
const Poco::URI uri(uri_str); const Poco::URI uri(uri_str);
@ -142,46 +116,15 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & con
throw Exception("Unable to create builder to connect to HDFS: " + throw Exception("Unable to create builder to connect to HDFS: " +
uri.toString() + " " + String(hdfsGetLastError()), uri.toString() + " " + String(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR); ErrorCodes::NETWORK_ERROR);
// hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
// hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
// hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
// hdfsBuilderConfSetStr(builder.get(), "hadoop.security.authentication", "kerberos"); hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
// hdfsBuilderConfSetStr(builder.get(), "dfs.client.log.severity", "TRACE"); hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
const auto & config = context.getConfigRef();
if (config.has(HDFSBuilderWrapper::CONFIG_PREFIX))
{
builder.loadFromConfig(config, HDFSBuilderWrapper::CONFIG_PREFIX);
if (builder.needKinit)
{
String cmd = builder.getKinitCmd();
int ret = system(cmd.c_str());
if (ret)
{
throw Exception("kinit failure: " + std::to_string(ret) + " " + cmd, ErrorCodes::NETWORK_ERROR);
}
}
}
<<<<<<< HEAD
// hdfsBuilderConfSetStr(builder.get(), "hadoop.security.authentication", "kerberos");
// hdfsBuilderConfSetStr(builder.get(), "dfs.client.log.severity", "TRACE");
const auto & config = context.getConfigRef();
String user_info = uri.getUserInfo(); String user_info = uri.getUserInfo();
String user; String user;
if (!user_info.empty() && user_info.front() != ':') if (!user_info.empty() && user_info.front() != ':')
{ {
=======
String user_info = uri.getUserInfo();
if (!user_info.empty() && user_info.front() != ':')
{
String user;
>>>>>>> kerberized hdfs compiled
size_t delim_pos = user_info.find(':'); size_t delim_pos = user_info.find(':');
if (delim_pos != String::npos) if (delim_pos != String::npos)
user = user_info.substr(0, delim_pos); user = user_info.substr(0, delim_pos);
@ -196,11 +139,11 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & con
hdfsBuilderSetNameNodePort(builder.get(), port); hdfsBuilderSetNameNodePort(builder.get(), port);
} }
// const auto & config = context.getGlobalContext().getConfigRef();
const auto & config = context.getConfigRef();
if (config.has(HDFSBuilderWrapper::CONFIG_PREFIX)) if (config.has(HDFSBuilderWrapper::CONFIG_PREFIX))
{ {
builder.loadFromConfig(config, HDFSBuilderWrapper::CONFIG_PREFIX); builder.loadFromConfig(config, HDFSBuilderWrapper::CONFIG_PREFIX);
// builder.makeCachePath(context.getUserFilesPath());
} }
if (!user.empty()) if (!user.empty())
@ -211,17 +154,11 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & con
#if USE_INTERNAL_HDFS3_LIBRARY #if USE_INTERNAL_HDFS3_LIBRARY
builder.loadFromConfig(config, user_config_prefix, true); builder.loadFromConfig(config, user_config_prefix, true);
#else #else
throw Exception("Multi user HDFS configuration required internal libhdfs3", throw Exception("Multi user HDFS configuration required internal libhdfs3",
ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
#endif #endif
} }
// builder.makeCachePath(context.getUserFilesPath(), user);
} }
// else
// {
// builder.makeCachePath(context.getUserFilesPath());
// }
if (builder.needKinit) if (builder.needKinit)
{ {

View File

@ -1,74 +1,31 @@
#pragma once #pragma once
#include <Common/config.h> #include <Common/config.h>
#if USE_HDFS
#include <memory> #include <memory>
#include <type_traits> #include <type_traits>
#include <vector> #include <vector>
#include <string>
#if USE_HDFS
#include <hdfs/hdfs.h> #include <hdfs/hdfs.h>
#include <Storages/IStorage.h> #include <common/types.h>
#include <mutex>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB namespace DB
{ {
namespace detail namespace detail
{ {
/* struct HDFSBuilderDeleter */ struct HDFSFsDeleter
/* { */
/* void operator()(hdfsBuilder * builder_ptr) */
/* { */
/* hdfsFreeBuilder(builder_ptr); */
/* } */
/* }; */
struct HDFSFsDeleter
{
void operator()(hdfsFS fs_ptr)
{ {
hdfsDisconnect(fs_ptr); void operator()(hdfsFS fs_ptr)
}
};
#if 0
class KinitTaskHolder
{
using Container = std::map<std::string, BackgroundSchedulePool::TaskHolder>;
Container container;
String make_key(const HDFSBuilderWrapper & hbw)
{
return hbw.hadoop_kerberos_keytab + "^"
+ hbw.hadoop_kerberos_principal + "^"
+ std::to_string(time_relogin);
}
public:
using Descriptor = Container::iterator;
Descriptor addTask(const HDFSBuilderWrapper & hdfs_builder_wrapper)
{
auto key = make_key(hdfs_builder_wrapper);
auto it = container.find(key);
if ( it != std::end(container))
{ {
it = container.insert({key, task}).first; hdfsDisconnect(fs_ptr);
} }
};
return it.second->getptr();
}
void delTask(Descriptor it)
{
container.erase(it);
}
};
#endif
} }
struct HDFSFileInfo struct HDFSFileInfo
@ -92,26 +49,25 @@ struct HDFSFileInfo
} }
}; };
class HDFSBuilderWrapper class HDFSBuilderWrapper
{ {
hdfsBuilder * hdfs_builder; hdfsBuilder * hdfs_builder;
String hadoop_kerberos_keytab; String hadoop_kerberos_keytab;
String hadoop_kerberos_principal; String hadoop_kerberos_principal;
String hadoop_kerberos_kinit_command = "kinit"; String hadoop_kerberos_kinit_command = "kinit";
String hadoop_security_kerberos_ticket_cache_path; String hadoop_security_kerberos_ticket_cache_path;
static std::mutex kinit_mtx; static std::mutex kinit_mtx;
std::vector<std::pair<String, String>> config_stor; std::vector<std::pair<String, String>> config_stor;
// hdfs builder relies on an external config data storage
std::pair<String, String>& keep(const String & k, const String & v) std::pair<String, String>& keep(const String & k, const String & v)
{ {
return config_stor.emplace_back(std::make_pair(k, v)); return config_stor.emplace_back(std::make_pair(k, v));
} }
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser = false);
const String & config_path, bool isUser = false);
String getKinitCmd(); String getKinitCmd();
@ -119,8 +75,6 @@ class HDFSBuilderWrapper
void runKinit(); void runKinit();
void makeCachePath(const String & cachePath, String user = "");
static const String CONFIG_PREFIX; static const String CONFIG_PREFIX;
public: public:
@ -148,10 +102,6 @@ public:
friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & context); friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & context);
}; };
/* using HDFSBuilderPtr = std::unique_ptr<hdfsBuilder, detail::HDFSBuilderDeleter>; */
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>; using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large // set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large

View File

@ -1,8 +1,7 @@
#include "ReadBufferFromHDFS.h" #include "ReadBufferFromHDFS.h"
#if USE_HDFS #if USE_HDFS
#include <Interpreters/Context.h> #include <Storages/HDFS/HDFSCommon.h>
#include <IO/HDFSCommon.h>
#include <hdfs/hdfs.h> #include <hdfs/hdfs.h>
#include <mutex> #include <mutex>
@ -28,11 +27,11 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
HDFSFSPtr fs; HDFSFSPtr fs;
explicit ReadBufferFromHDFSImpl(const std::string & hdfs_name_, const Context & context_) explicit ReadBufferFromHDFSImpl(const std::string & hdfs_name_, const Context & context_)
: hdfs_uri(hdfs_name_) : hdfs_uri(hdfs_name_),
builder(createHDFSBuilder(hdfs_uri, context_))
{ {
std::lock_guard lock(hdfs_init_mutex); std::lock_guard lock(hdfs_init_mutex);
builder = createHDFSBuilder(hdfs_uri, context_);
fs = createHDFSFS(builder.get()); fs = createHDFSFS(builder.get());
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2); const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const std::string path = hdfs_uri.substr(begin_of_path); const std::string path = hdfs_uri.substr(begin_of_path);

View File

@ -4,11 +4,17 @@
#if USE_HDFS #if USE_HDFS
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
#include <Storages/IStorage.h>
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
#include <string> #include <string>
#include <memory> #include <memory>
#include <hdfs/hdfs.h>
#include <common/types.h>
#include <Interpreters/Context.h>
namespace DB namespace DB
{ {
/** Accepts HDFS path to file and opens it. /** Accepts HDFS path to file and opens it.
@ -20,9 +26,6 @@ class ReadBufferFromHDFS : public BufferWithOwnMemory<ReadBuffer>
std::unique_ptr<ReadBufferFromHDFSImpl> impl; std::unique_ptr<ReadBufferFromHDFSImpl> impl;
public: public:
ReadBufferFromHDFS(const std::string & hdfs_name_, const Context & context, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); ReadBufferFromHDFS(const std::string & hdfs_name_, const Context & context, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
// ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default;
ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default;
~ReadBufferFromHDFS() override; ~ReadBufferFromHDFS() override;
bool nextImpl() override; bool nextImpl() override;

View File

@ -3,15 +3,15 @@
#if USE_HDFS #if USE_HDFS
#include <Storages/StorageFactory.h> #include <Storages/StorageFactory.h>
#include <Storages/StorageHDFS.h> #include <Storages/HDFS/StorageHDFS.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
#include <Parsers/ASTLiteral.h> #include <Parsers/ASTLiteral.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromHDFS.h> #include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <IO/WriteBufferFromHDFS.h> #include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <IO/WriteHelpers.h> #include <IO/WriteHelpers.h>
#include <IO/HDFSCommon.h> #include <Storages/HDFS/HDFSCommon.h>
#include <Formats/FormatFactory.h> #include <Formats/FormatFactory.h>
#include <DataTypes/DataTypeString.h> #include <DataTypes/DataTypeString.h>
#include <DataStreams/IBlockOutputStream.h> #include <DataStreams/IBlockOutputStream.h>
@ -176,7 +176,7 @@ public:
HDFSBlockOutputStream(const String & uri, HDFSBlockOutputStream(const String & uri,
const String & format, const String & format,
const Block & sample_block_, const Block & sample_block_,
Context & context, const Context & context,
const CompressionMethod compression_method) const CompressionMethod compression_method)
: sample_block(sample_block_) : sample_block(sample_block_)
{ {

View File

@ -3,8 +3,8 @@
#if USE_HDFS #if USE_HDFS
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <IO/WriteBufferFromHDFS.h> #include <Storages/HDFS/WriteBufferFromHDFS.h>
#include <IO/HDFSCommon.h> #include <Storages/HDFS/HDFSCommon.h>
#include <hdfs/hdfs.h> #include <hdfs/hdfs.h>
@ -37,9 +37,6 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
if (path.find_first_of("*?{") != std::string::npos) if (path.find_first_of("*?{") != std::string::npos)
throw Exception("URI '" + hdfs_uri + "' contains globs, so the table is in readonly mode", ErrorCodes::CANNOT_OPEN_FILE); throw Exception("URI '" + hdfs_uri + "' contains globs, so the table is in readonly mode", ErrorCodes::CANNOT_OPEN_FILE);
// int flags = hdfsExists(fs.get(), path.c_str()) ? (O_WRONLY|O_SYNC) : (O_WRONLY|O_APPEND|O_SYNC); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
// fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, 0, 1024*1024);
if (!hdfsExists(fs.get(), path.c_str())) if (!hdfsExists(fs.get(), path.c_str()))
throw Exception("File: " + path + " is already exists", ErrorCodes::BAD_ARGUMENTS); throw Exception("File: " + path + " is already exists", ErrorCodes::BAD_ARGUMENTS);
fout = hdfsOpenFile(fs.get(), path.c_str(), O_WRONLY, 0, 0, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here fout = hdfsOpenFile(fs.get(), path.c_str(), O_WRONLY, 0, 0, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
@ -80,8 +77,6 @@ WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, const C
: BufferWithOwnMemory<WriteBuffer>(buf_size) : BufferWithOwnMemory<WriteBuffer>(buf_size)
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_, context)) , impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_, context))
{ {
// auto modified_context = std::make_shared<Context>(context);
// impl = std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_, modified_context);
} }

View File

@ -4,7 +4,6 @@
#if USE_HDFS #if USE_HDFS
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
#include <Storages/IStorage.h>
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
#include <string> #include <string>
#include <memory> #include <memory>

View File

@ -2,7 +2,7 @@
#include "registerTableFunctions.h" #include "registerTableFunctions.h"
#if USE_HDFS #if USE_HDFS
#include <Storages/StorageHDFS.h> #include <Storages/HDFS/StorageHDFS.h>
#include <Storages/ColumnsDescription.h> #include <Storages/ColumnsDescription.h>
#include <TableFunctions/TableFunctionFactory.h> #include <TableFunctions/TableFunctionFactory.h>
#include <TableFunctions/TableFunctionHDFS.h> #include <TableFunctions/TableFunctionHDFS.h>

View File

@ -169,7 +169,7 @@ class ClickHouseCluster:
cmd += " client" cmd += " client"
return cmd return cmd
def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries = None, def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None,
macros=None, macros=None,
with_zookeeper=False, with_mysql=False, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False, with_zookeeper=False, with_mysql=False, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False,
clickhouse_path_dir=None, clickhouse_path_dir=None,
@ -321,7 +321,8 @@ class ClickHouseCluster:
self.with_kerberized_hdfs = True self.with_kerberized_hdfs = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')]) self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')])
self.base_kerberized_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', self.base_kerberized_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
self.project_name, '--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')] self.project_name, '--file',
p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')]
cmds.append(self.base_kerberized_hdfs_cmd) cmds.append(self.base_kerberized_hdfs_cmd)
if with_mongo and not self.with_mongo: if with_mongo and not self.with_mongo:
@ -485,38 +486,32 @@ class ClickHouseCluster:
raise Exception("Cannot wait ZooKeeper container") raise Exception("Cannot wait ZooKeeper container")
def wait_hdfs_to_start(self, timeout=60, kerberized=False): def make_hdfs_api(self, timeout=60, kerberized=False):
start = time.time()
if kerberized: if kerberized:
keytab = p.abspath(p.join(self.instances['node1'].path, "secrets/clickhouse.keytab")) keytab = p.abspath(p.join(self.instances['node1'].path, "secrets/clickhouse.keytab"))
krb_conf = p.abspath(p.join(self.instances['node1'].path, "secrets/krb.conf")) krb_conf = p.abspath(p.join(self.instances['node1'].path, "secrets/krb_long.conf"))
hdfs_ip = self.get_instance_ip('kerberizedhdfs1') hdfs_ip = self.get_instance_ip('kerberizedhdfs1')
print("kerberizedhdfs1 ip ", hdfs_ip) # print("kerberizedhdfs1 ip ", hdfs_ip)
kdc_ip = self.get_instance_ip('hdfskerberos') kdc_ip = self.get_instance_ip('hdfskerberos')
print("kdc_ip ", kdc_ip) # print("kdc_ip ", kdc_ip)
self.hdfs_api = HDFSApi(user="root", self.hdfs_api = HDFSApi(user="root",
timeout=timeout, timeout=timeout,
kerberized=True, kerberized=True,
principal="root@TEST.CLICKHOUSE.TECH", principal="root@TEST.CLICKHOUSE.TECH",
keytab=keytab, keytab=keytab,
krb_conf=krb_conf, krb_conf=krb_conf,
# host="kerberizedhdfs1.test.clickhouse.tech",
host="kerberizedhdfs1", host="kerberizedhdfs1",
protocol="http", protocol="http",
# protocol="https",
proxy_port=50070, proxy_port=50070,
# proxy_port=50470,
# data_port=50475,
data_port=1006, data_port=1006,
hdfs_ip=hdfs_ip, hdfs_ip=hdfs_ip,
kdc_ip=kdc_ip) kdc_ip=kdc_ip)
# self.hdfs_api = hdfs_api
else: else:
self.hdfs_api = HDFSApi(user="root", host="hdfs1") self.hdfs_api = HDFSApi(user="root", host="hdfs1")
# time.sleep(150)
# return
def wait_hdfs_to_start(self, timeout=60):
start = time.time()
while time.time() - start < timeout: while time.time() - start < timeout:
try: try:
self.hdfs_api.write_data("/somefilewithrandomname222", "1") self.hdfs_api.write_data("/somefilewithrandomname222", "1")
@ -658,6 +653,7 @@ class ClickHouseCluster:
self.wait_schema_registry_to_start(120) self.wait_schema_registry_to_start(120)
if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd: if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd:
print('Setup kerberized kafka')
env = os.environ.copy() env = os.environ.copy()
env['KERBERIZED_KAFKA_DIR'] = instance.path + '/' env['KERBERIZED_KAFKA_DIR'] = instance.path + '/'
subprocess.check_call(self.base_kerberized_kafka_cmd + common_opts + ['--renew-anon-volumes'], env=env) subprocess.check_call(self.base_kerberized_kafka_cmd + common_opts + ['--renew-anon-volumes'], env=env)
@ -669,21 +665,16 @@ class ClickHouseCluster:
if self.with_hdfs and self.base_hdfs_cmd: if self.with_hdfs and self.base_hdfs_cmd:
print('Setup HDFS') print('Setup HDFS')
subprocess_check_call(self.base_hdfs_cmd + common_opts) subprocess_check_call(self.base_hdfs_cmd + common_opts)
self.make_hdfs_api()
self.wait_hdfs_to_start(120) self.wait_hdfs_to_start(120)
if self.with_kerberized_hdfs and self.base_kerberized_hdfs_cmd: if self.with_kerberized_hdfs and self.base_kerberized_hdfs_cmd:
print('Setup kerberized HDFS') print('Setup kerberized HDFS')
env_var = {} env = os.environ.copy()
env_var['KERBERIZED_HDFS_DIR'] = instance.path + '/' env['KERBERIZED_HDFS_DIR'] = instance.path + '/'
subprocess.check_call(self.base_kerberized_hdfs_cmd + common_opts, env=env)
# different docker_compose versions look for .env in different places self.make_hdfs_api(kerberized=True)
# -- env-file too recent to rely on it self.wait_hdfs_to_start(timeout=300)
files_to_cleanup = []
files_to_cleanup.append(_create_env_file(self.base_dir, env_var, ".env"))
files_to_cleanup.append(_create_env_file(os.getcwd(), env_var, ".env"))
subprocess.check_call(self.base_kerberized_hdfs_cmd + common_opts, env=env_var)
self.wait_hdfs_to_start(kerberized=True, timeout=300)
remove_files(files_to_cleanup)
if self.with_mongo and self.base_mongo_cmd: if self.with_mongo and self.base_mongo_cmd:
print('Setup Mongo') print('Setup Mongo')
@ -940,7 +931,7 @@ class ClickHouseInstance:
if with_kerberized_kafka or with_kerberized_hdfs: if with_kerberized_kafka or with_kerberized_hdfs:
self.keytab_path = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets:/tmp/keytab" self.keytab_path = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets:/tmp/keytab"
self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb_ch.conf:/etc/krb5.conf:ro" self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb.conf:/etc/krb5.conf:ro"
else: else:
self.keytab_path = "" self.keytab_path = ""
self.krb5_conf = "" self.krb5_conf = ""

View File

@ -4,6 +4,7 @@ import gzip
import subprocess import subprocess
import time import time
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
import requests
import requests_kerberos as reqkerb import requests_kerberos as reqkerb
import socket import socket
import tempfile import tempfile
@ -13,9 +14,9 @@ import os
g_dns_hook = None g_dns_hook = None
def custom_getaddrinfo(*args): def custom_getaddrinfo(*args):
print("from custom_getaddrinfo g_dns_hook is None ", g_dns_hook is None) # print("from custom_getaddrinfo g_dns_hook is None ", g_dns_hook is None)
ret = g_dns_hook.custom_getaddrinfo(*args) ret = g_dns_hook.custom_getaddrinfo(*args)
print("g_dns_hook.custom_getaddrinfo result", ret) # print("g_dns_hook.custom_getaddrinfo result", ret)
return ret return ret
@ -28,7 +29,7 @@ class mk_krb_conf(object):
with open(self.krb_conf) as f: with open(self.krb_conf) as f:
content = f.read() content = f.read()
amended_content = content.replace('hdfskerberos', self.kdc_ip) amended_content = content.replace('hdfskerberos', self.kdc_ip)
self.amended_krb_conf = tempfile.NamedTemporaryFile(delete=False) self.amended_krb_conf = tempfile.NamedTemporaryFile(delete=False, mode="w+")
self.amended_krb_conf.write(amended_content) self.amended_krb_conf.write(amended_content)
self.amended_krb_conf.close() self.amended_krb_conf.close()
return self.amended_krb_conf.name return self.amended_krb_conf.name
@ -36,38 +37,32 @@ class mk_krb_conf(object):
if self.amended_krb_conf is not None: if self.amended_krb_conf is not None:
self.amended_krb_conf.close() self.amended_krb_conf.close()
# tweak dns resolution to connect to localhost where api_host is in URL
class dns_hook(object): class dns_hook(object):
def __init__(self, hdfs_api): def __init__(self, hdfs_api):
print("dns_hook.init ", hdfs_api.kerberized, hdfs_api.host, hdfs_api.data_port, hdfs_api.proxy_port) # print("dns_hook.init ", hdfs_api.kerberized, hdfs_api.host, hdfs_api.data_port, hdfs_api.proxy_port)
self.hdfs_api = hdfs_api self.hdfs_api = hdfs_api
def __enter__(self): def __enter__(self):
global g_dns_hook global g_dns_hook
g_dns_hook = self g_dns_hook = self
if True: # self.hdfs_api.kerberized: # print("g_dns_hook is None ", g_dns_hook is None)
print("g_dns_hook is None ", g_dns_hook is None) self.original_getaddrinfo = socket.getaddrinfo
self.original_getaddrinfo = socket.getaddrinfo socket.getaddrinfo = custom_getaddrinfo
socket.getaddrinfo = custom_getaddrinfo return self
return self
def __exit__(self, type, value, traceback): def __exit__(self, type, value, traceback):
global g_dns_hook global g_dns_hook
g_dns_hook = None g_dns_hook = None
if True: # self.hdfs_api.kerberized: socket.getaddrinfo = self.original_getaddrinfo
socket.getaddrinfo = self.original_getaddrinfo
def custom_getaddrinfo(self, *args): def custom_getaddrinfo(self, *args):
(hostname, port) = args[:2] (hostname, port) = args[:2]
print("top of custom_getaddrinfo", hostname, port) # print("top of custom_getaddrinfo", hostname, port)
if hostname == self.hdfs_api.host and (port == self.hdfs_api.data_port or port == self.hdfs_api.proxy_port): if hostname == self.hdfs_api.host and (port == self.hdfs_api.data_port or port == self.hdfs_api.proxy_port):
print("dns_hook substitute") # print("dns_hook substitute")
return [(socket.AF_INET, 1, 6, '', ("127.0.0.1", port))] #self.hdfs_api.hdfs_ip return [(socket.AF_INET, 1, 6, '', ("127.0.0.1", port))]
else: else:
return self.original_getaddrinfo(*args) return self.original_getaddrinfo(*args)
import requests
class HDFSApi(object): class HDFSApi(object):
def __init__(self, user, timeout=100, kerberized=False, principal=None, def __init__(self, user, timeout=100, kerberized=False, principal=None,
keytab=None, krb_conf=None, keytab=None, krb_conf=None,
@ -86,14 +81,11 @@ class HDFSApi(object):
self.kdc_ip = kdc_ip self.kdc_ip = kdc_ip
self.krb_conf = krb_conf self.krb_conf = krb_conf
logging.basicConfig(level=logging.DEBUG) # logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.DEBUG) # logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3") # requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG) # requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True # requests_log.propagate = True
if kerberized: if kerberized:
self._run_kinit() self._run_kinit()
@ -109,23 +101,23 @@ class HDFSApi(object):
raise Exception("kerberos principal and keytab are required") raise Exception("kerberos principal and keytab are required")
with mk_krb_conf(self.krb_conf, self.kdc_ip) as instantiated_krb_conf: with mk_krb_conf(self.krb_conf, self.kdc_ip) as instantiated_krb_conf:
print("instantiated_krb_conf ", instantiated_krb_conf) # print("instantiated_krb_conf ", instantiated_krb_conf)
os.environ["KRB5_CONFIG"] = instantiated_krb_conf os.environ["KRB5_CONFIG"] = instantiated_krb_conf
cmd = "(kinit -R -t {keytab} -k {principal} || (sleep 5 && kinit -R -t {keytab} -k {principal})) ; klist".format(instantiated_krb_conf=instantiated_krb_conf, keytab=self.keytab, principal=self.principal) cmd = "(kinit -R -t {keytab} -k {principal} || (sleep 5 && kinit -R -t {keytab} -k {principal})) ; klist".format(instantiated_krb_conf=instantiated_krb_conf, keytab=self.keytab, principal=self.principal)
print(cmd) # print(cmd)
start = time.time() start = time.time()
while time.time() - start < self.timeout: while time.time() - start < self.timeout:
try: try:
subprocess.call(cmd, shell=True) subprocess.call(cmd, shell=True)
print "KDC started, kinit successfully run" print("KDC started, kinit successfully run")
return return
except Exception as ex: except Exception as ex:
print "Can't run kinit ... waiting" + str(ex) print("Can't run kinit ... waiting {}".format(str(ex)))
time.sleep(1) time.sleep(1)
raise Exception("Kinit running failure") raise Exception("Kinit running failure")
@ -137,7 +129,7 @@ class HDFSApi(object):
response.raise_for_status() response.raise_for_status()
# additional_params = '&'.join(response.headers['Location'].split('&')[1:2]) # additional_params = '&'.join(response.headers['Location'].split('&')[1:2])
url = "{location}".format(location=response.headers['Location']) url = "{location}".format(location=response.headers['Location'])
print("redirected to ", url) # print("redirected to ", url)
with dns_hook(self): with dns_hook(self):
response_data = requests.get(url, response_data = requests.get(url,
headers={'host': 'localhost'}, headers={'host': 'localhost'},
@ -149,20 +141,6 @@ class HDFSApi(object):
else: else:
return response_data.content return response_data.content
# Requests can't put file
def _curl_to_put(self, filename, path, params):
url = "{protocol}://{host}:{port}/webhdfs/v1{path}?op=CREATE&{params}".format(protocol=self.protocol,
host=self.host,
port=self.data_port,
path=path,
params=params)
if self.kerberized:
cmd = "curl -k --negotiate -s -i -X PUT -T {fname} -u : '{url}' --resolve {host}:{port}:127.0.0.1".format(fname=filename, url=url)
else:
cmd = "curl -s -i -X PUT -T {fname} '{url}'".format(fname=filename, url=url)
output = subprocess.check_output(cmd, shell=True)
return output
def write_data(self, path, content): def write_data(self, path, content):
named_file = NamedTemporaryFile(mode='wb+') named_file = NamedTemporaryFile(mode='wb+')
fpath = named_file.name fpath = named_file.name
@ -173,12 +151,9 @@ class HDFSApi(object):
if self.kerberized: if self.kerberized:
print("before request.put", os.environ["KRB5_CONFIG"])
self._run_kinit() self._run_kinit()
# cmd = "klist"
# subprocess.call(cmd, shell=True)
self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal) self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal)
print(self.kerberos_auth) # print(self.kerberos_auth)
with dns_hook(self): with dns_hook(self):
response = requests.put( response = requests.put(
@ -190,34 +165,26 @@ class HDFSApi(object):
params={'overwrite' : 'true'}, params={'overwrite' : 'true'},
verify=False, auth=self.kerberos_auth verify=False, auth=self.kerberos_auth
) )
print("after request.put", response.status_code)
if response.status_code != 307: if response.status_code != 307:
print(response.headers) # print(response.headers)
response.raise_for_status() response.raise_for_status()
print("after status code check")
additional_params = '&'.join( additional_params = '&'.join(
response.headers['Location'].split('&')[1:2] + ["user.name={}".format(self.user), "overwrite=true"]) response.headers['Location'].split('&')[1:2] + ["user.name={}".format(self.user), "overwrite=true"])
if False: #not self.kerberized: with dns_hook(self), open(fpath, mode="rb") as fh:
output = self._curl_to_put(fpath, path, additional_params) file_data = fh.read()
if "201 Created" not in output: protocol = "http" # self.protocol
raise Exception("Can't create file on hdfs:\n {}".format(output)) response = requests.put(
else: "{location}".format(location=response.headers['Location']),
with dns_hook(self), open(fpath) as fh: data=file_data,
file_data = fh.read() headers={'content-type':'text/plain', 'host': 'localhost'},
protocol = "http" # self.protocol params={'file': path, 'user.name' : self.user},
response = requests.put( allow_redirects=False, verify=False, auth=self.kerberos_auth
"{location}".format(location=response.headers['Location']), )
data=file_data, # print(response)
headers={'content-type':'text/plain', 'host': 'localhost'}, if response.status_code != 201:
params={'file': path, 'user.name' : self.user}, response.raise_for_status()
allow_redirects=False, verify=False, auth=self.kerberos_auth
)
print(response)
if response.status_code != 201:
response.raise_for_status()
def write_gzip_data(self, path, content): def write_gzip_data(self, path, content):

View File

@ -1,6 +1,5 @@
import pytest import pytest
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
from helpers.hdfs_api import HDFSApi
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/config_with_hosts.xml']) node1 = cluster.add_instance('node1', main_configs=['configs/config_with_hosts.xml'])
@ -101,9 +100,8 @@ def test_table_function_remote(start_cluster):
def test_redirect(start_cluster): def test_redirect(start_cluster):
hdfs_api = HDFSApi("root") start_cluster.hdfs_api.write_data("/simple_storage", "1\t\n")
hdfs_api.write_data("/simple_storage", "1\t\n") assert start_cluster.hdfs_api.read_data("/simple_storage") == "1\t\n"
assert hdfs_api.read_data("/simple_storage") == "1\t\n"
node7.query( node7.query(
"CREATE TABLE table_test_7_1 (word String) ENGINE=URL('http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', CSV)") "CREATE TABLE table_test_7_1 (word String) ENGINE=URL('http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', CSV)")
assert "not allowed" in node7.query_and_get_error("SET max_http_get_redirects=1; SELECT * from table_test_7_1") assert "not allowed" in node7.query_and_get_error("SET max_http_get_redirects=1; SELECT * from table_test_7_1")

View File

@ -17,9 +17,8 @@ def started_cluster():
def test_url_without_redirect(started_cluster): def test_url_without_redirect(started_cluster):
hdfs_api = HDFSApi("root") started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n") assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access datanode port directly # access datanode port directly
node1.query( node1.query(
@ -28,9 +27,8 @@ def test_url_without_redirect(started_cluster):
def test_url_with_redirect_not_allowed(started_cluster): def test_url_with_redirect_not_allowed(started_cluster):
hdfs_api = HDFSApi("root") started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n") assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access proxy port without allowing redirects # access proxy port without allowing redirects
node1.query( node1.query(
@ -40,9 +38,8 @@ def test_url_with_redirect_not_allowed(started_cluster):
def test_url_with_redirect_allowed(started_cluster): def test_url_with_redirect_allowed(started_cluster):
hdfs_api = HDFSApi("root") started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n") assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access proxy port with allowing redirects # access proxy port with allowing redirects
# http://localhost:50070/webhdfs/v1/b?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0 # http://localhost:50070/webhdfs/v1/b?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0

View File

@ -203,5 +203,5 @@ def test_write_gzip_storage(started_cluster):
if __name__ == '__main__': if __name__ == '__main__':
cluster.start() cluster.start()
raw_input("Cluster created, press any key to destroy...") input("Cluster created, press any key to destroy...")
cluster.shutdown() cluster.shutdown()

View File

@ -7,8 +7,7 @@
default_realm = TEST.CLICKHOUSE.TECH default_realm = TEST.CLICKHOUSE.TECH
dns_lookup_realm = false dns_lookup_realm = false
dns_lookup_kdc = false dns_lookup_kdc = false
ticket_lifetime = 15d ticket_lifetime = 15s
# renew_lifetime = 15d
forwardable = true forwardable = true
default_tgs_enctypes = des3-hmac-sha1 default_tgs_enctypes = des3-hmac-sha1
default_tkt_enctypes = des3-hmac-sha1 default_tkt_enctypes = des3-hmac-sha1

View File

@ -7,8 +7,7 @@
default_realm = TEST.CLICKHOUSE.TECH default_realm = TEST.CLICKHOUSE.TECH
dns_lookup_realm = false dns_lookup_realm = false
dns_lookup_kdc = false dns_lookup_kdc = false
ticket_lifetime = 15s ticket_lifetime = 15d
# renew_lifetime = 15d
forwardable = true forwardable = true
default_tgs_enctypes = des3-hmac-sha1 default_tgs_enctypes = des3-hmac-sha1
default_tkt_enctypes = des3-hmac-sha1 default_tkt_enctypes = des3-hmac-sha1

View File

@ -6,7 +6,6 @@ import os
from helpers.cluster import ClickHouseCluster from helpers.cluster import ClickHouseCluster
import subprocess import subprocess
cluster = ClickHouseCluster(__file__) cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_kerberized_hdfs=True, user_configs=[], main_configs=['configs/log_conf.xml', 'configs/hdfs.xml']) node1 = cluster.add_instance('node1', with_kerberized_hdfs=True, user_configs=[], main_configs=['configs/log_conf.xml', 'configs/hdfs.xml'])
@ -24,59 +23,40 @@ def started_cluster():
cluster.shutdown() cluster.shutdown()
def test_read_table(started_cluster): def test_read_table(started_cluster):
# hdfs_api = HDFSApi("root")
data = "1\tSerialize\t555.222\n2\tData\t777.333\n" data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
started_cluster.hdfs_api.write_data("/simple_table_function", data) started_cluster.hdfs_api.write_data("/simple_table_function", data)
api_read = started_cluster.hdfs_api.read_data("/simple_table_function") api_read = started_cluster.hdfs_api.read_data("/simple_table_function")
print("api_read", api_read)
assert api_read == data assert api_read == data
select_read = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')") select_read = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')")
print("select_read", select_read)
assert select_read == data assert select_read == data
def test_read_write_storage(started_cluster): def test_read_write_storage(started_cluster):
# node1.query("create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberized_hdfs1.test.clickhouse.tech:9000/simple_storage', 'TSV')")
node1.query("create table SimpleHDFSStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage1', 'TSV')") node1.query("create table SimpleHDFSStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage1', 'TSV')")
node1.query("insert into SimpleHDFSStorage2 values (1, 'Mark', 72.53)") node1.query("insert into SimpleHDFSStorage2 values (1, 'Mark', 72.53)")
api_read = started_cluster.hdfs_api.read_data("/simple_storage1") api_read = started_cluster.hdfs_api.read_data("/simple_storage1")
print("api_read", api_read)
assert api_read == "1\tMark\t72.53\n" assert api_read == "1\tMark\t72.53\n"
select_read = node1.query("select * from SimpleHDFSStorage2") select_read = node1.query("select * from SimpleHDFSStorage2")
print("select_read", select_read)
assert select_read == "1\tMark\t72.53\n" assert select_read == "1\tMark\t72.53\n"
def test_write_storage_expired(started_cluster): def test_write_storage_not_expired(started_cluster):
node1.query("create table SimpleHDFSStorageExpired (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage_expired', 'TSV')") node1.query("create table SimpleHDFSStorageNotExpired (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage_not_expired', 'TSV')")
time.sleep(45) # wait for ticket expiration time.sleep(45) # wait for ticket expiration
node1.query("insert into SimpleHDFSStorageExpired values (1, 'Mark', 72.53)") node1.query("insert into SimpleHDFSStorageNotExpired values (1, 'Mark', 72.53)")
api_read = started_cluster.hdfs_api.read_data("/simple_storage_expired") api_read = started_cluster.hdfs_api.read_data("/simple_storage_not_expired")
print("api_read", api_read)
assert api_read == "1\tMark\t72.53\n" assert api_read == "1\tMark\t72.53\n"
select_read = node1.query("select * from SimpleHDFSStorageExpired") select_read = node1.query("select * from SimpleHDFSStorageNotExpired")
print("select_read", select_read)
assert select_read == "1\tMark\t72.53\n" assert select_read == "1\tMark\t72.53\n"
def test_prohibited(started_cluster):
node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9000/storage_user_two_prohibited', 'TSV')")
try:
node1.query("insert into HDFSStorTwoProhibited values (1, 'SomeOne', 74.00)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "Unable to open HDFS file: /storage_user_two_prohibited error: Permission denied: user=specuser, access=WRITE" in str(ex)
def test_two_users(started_cluster): def test_two_users(started_cluster):
node1.query("create table HDFSStorOne (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/storage_user_one', 'TSV')") node1.query("create table HDFSStorOne (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/storage_user_one', 'TSV')")
node1.query("insert into HDFSStorOne values (1, 'IlyaReal', 86.00)") node1.query("insert into HDFSStorOne values (1, 'IlyaReal', 86.00)")
@ -85,31 +65,10 @@ def test_two_users(started_cluster):
node1.query("insert into HDFSStorTwo values (1, 'IlyaIdeal', 74.00)") node1.query("insert into HDFSStorTwo values (1, 'IlyaIdeal', 74.00)")
select_read_1 = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV', 'id UInt64, text String, number Float64')") select_read_1 = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV', 'id UInt64, text String, number Float64')")
print("select_read_1", select_read_1)
select_read_2 = node1.query("select * from hdfs('hdfs://suser@kerberizedhdfs1:9000/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')") select_read_2 = node1.query("select * from hdfs('hdfs://suser@kerberizedhdfs1:9000/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')")
print("select_read_2", select_read_2)
# node1.query("create table HDFSStorTwo_ (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV')") def test_read_table_expired(started_cluster):
# try:
# node1.query("insert into HDFSStorTwo_ values (1, 'AnotherPerspn', 88.54)")
# assert False, "Exception have to be thrown"
# except Exception as ex:
# print ex
# assert "DB::Exception: Unable to open HDFS file: /user/specuser/storage_user_two error: Permission denied: user=root, access=WRITE, inode=\"/user/specuser/storage_user_two\":specuser:supergroup:drwxr-xr-x" in str(ex)
def test_cache_path(started_cluster):
node1.query("create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9000/storage_dedicated_cache_path', 'TSV')")
try:
node1.query("insert into HDFSStorCachePath values (1, 'FatMark', 92.53)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "DB::Exception: hadoop.security.kerberos.ticket.cache.path cannot be set per user" in str(ex)
def test_read_table_not_expired(started_cluster):
data = "1\tSerialize\t555.222\n2\tData\t777.333\n" data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
started_cluster.hdfs_api.write_data("/simple_table_function_relogin", data) started_cluster.hdfs_api.write_data("/simple_table_function_relogin", data)
@ -125,13 +84,25 @@ def test_read_table_not_expired(started_cluster):
started_cluster.unpause_container('hdfskerberos') started_cluster.unpause_container('hdfskerberos')
def test_prohibited(started_cluster):
node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9000/storage_user_two_prohibited', 'TSV')")
try:
node1.query("insert into HDFSStorTwoProhibited values (1, 'SomeOne', 74.00)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "Unable to open HDFS file: /storage_user_two_prohibited error: Permission denied: user=specuser, access=WRITE" in str(ex)
@pytest.mark.timeout(999999)
def _test_sleep_forever(started_cluster): def test_cache_path(started_cluster):
time.sleep(999999) node1.query("create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9000/storage_dedicated_cache_path', 'TSV')")
try:
node1.query("insert into HDFSStorCachePath values (1, 'FatMark', 92.53)")
assert False, "Exception have to be thrown"
except Exception as ex:
assert "DB::Exception: hadoop.security.kerberos.ticket.cache.path cannot be set per user" in str(ex)
if __name__ == '__main__': if __name__ == '__main__':
cluster.start() cluster.start()
raw_input("Cluster created, press any key to destroy...") input("Cluster created, press any key to destroy...")
cluster.shutdown() cluster.shutdown()