diff --git a/contrib/libgsasl b/contrib/libgsasl
index 140fb582505..383ee28e82f 160000
--- a/contrib/libgsasl
+++ b/contrib/libgsasl
@@ -1 +1 @@
-Subproject commit 140fb58250588c8323285b75fcf127c4adc33dfa
+Subproject commit 383ee28e82f69fa16ed43b48bd9c8ee5b313ab84
diff --git a/contrib/libhdfs3 b/contrib/libhdfs3
index 30552ac527f..095b9d48b40 160000
--- a/contrib/libhdfs3
+++ b/contrib/libhdfs3
@@ -1 +1 @@
-Subproject commit 30552ac527f2c14070d834e171493b2e7f662375
+Subproject commit 095b9d48b400abb72d967cb0539af13b1e3d90cf
diff --git a/contrib/libhdfs3-cmake/CMakeLists.txt b/contrib/libhdfs3-cmake/CMakeLists.txt
index e1129a4bd59..49b35d09431 100644
--- a/contrib/libhdfs3-cmake/CMakeLists.txt
+++ b/contrib/libhdfs3-cmake/CMakeLists.txt
@@ -33,6 +33,11 @@ set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake" ${CMAKE_MODULE_PATH})
include(Platform)
include(Options)
+# # prefer shared libraries
+# if (WITH_KERBEROS)
+# find_package(KERBEROS REQUIRED)
+# endif()
+
# source
set(PROTO_FILES
#${HDFS3_SOURCE_DIR}/proto/encryption.proto
@@ -207,14 +212,11 @@ target_include_directories(hdfs3 PRIVATE ${HDFS3_COMMON_DIR})
target_include_directories(hdfs3 PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
target_include_directories(hdfs3 PRIVATE ${LIBGSASL_INCLUDE_DIR})
-# if (WITH_KERBEROS)
-# target_include_directories(hdfs3 PRIVATE ${KERBEROS_INCLUDE_DIRS})
-# endif()
target_include_directories(hdfs3 PRIVATE ${LIBXML2_INCLUDE_DIR})
target_link_libraries(hdfs3 PRIVATE ${LIBGSASL_LIBRARY})
if (WITH_KERBEROS)
- target_link_libraries(hdfs3 PUBLIC ${KRB5_LIBRARY})
+ target_link_libraries(hdfs3 PRIVATE ${KRB5_LIBRARY})
endif()
target_link_libraries(hdfs3 PRIVATE ${LIBXML2_LIBRARIES})
diff --git a/docker/test/integration/kerberized_hadoop/Dockerfile b/docker/test/integration/kerberized_hadoop/Dockerfile
new file mode 100644
index 00000000000..08beab91a56
--- /dev/null
+++ b/docker/test/integration/kerberized_hadoop/Dockerfile
@@ -0,0 +1,10 @@
+# docker build -t ilejn/kerberized-hadoop .
+FROM sequenceiq/hadoop-docker:2.7.0
+RUN yum --quiet --assumeyes install krb5-workstation.x86_64
+RUN cd /tmp && \
+ curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz && \
+ tar xzf commons-daemon-1.0.15-src.tar.gz && \
+ cd commons-daemon-1.0.15-src/src/native/unix && \
+ ./configure && \
+ make && \
+ cp ./jsvc /usr/local/hadoop/sbin
diff --git a/docker/test/integration/runner/Dockerfile b/docker/test/integration/runner/Dockerfile
index 36188fc4a63..9b51891ccf5 100644
--- a/docker/test/integration/runner/Dockerfile
+++ b/docker/test/integration/runner/Dockerfile
@@ -29,6 +29,8 @@ RUN apt-get update \
libcurl4-openssl-dev \
gdb \
software-properties-common \
+ libkrb5-dev \
+ krb5-user \
&& rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \
@@ -75,7 +77,8 @@ RUN python3 -m pip install \
pytest-timeout \
redis \
tzlocal \
- urllib3
+ urllib3 \
+ requests-kerberos
COPY modprobe.sh /usr/local/bin/modprobe
COPY dockerd-entrypoint.sh /usr/local/bin/
diff --git a/docker/test/integration/runner/compose/docker_compose_kerberized_hdfs.yml b/docker/test/integration/runner/compose/docker_compose_kerberized_hdfs.yml
index 787d88880dd..de6bfa16785 100644
--- a/docker/test/integration/runner/compose/docker_compose_kerberized_hdfs.yml
+++ b/docker/test/integration/runner/compose/docker_compose_kerberized_hdfs.yml
@@ -3,14 +3,14 @@ version: '2.3'
services:
kerberizedhdfs1:
cap_add:
- - CAP_DAC_READ_SEARCH
+ - DAC_READ_SEARCH
image: ilejn/kerberized-hadoop:latest
hostname: kerberizedhdfs1
restart: always
volumes:
- ${KERBERIZED_HDFS_DIR}/../../hdfs_configs/bootstrap.sh:/etc/bootstrap.sh:ro
- ${KERBERIZED_HDFS_DIR}/secrets:/usr/local/hadoop/etc/hadoop/conf
- - ${KERBERIZED_HDFS_DIR}/secrets/krb.conf:/etc/krb5.conf:ro
+ - ${KERBERIZED_HDFS_DIR}/secrets/krb_long.conf:/etc/krb5.conf:ro
ports:
- 1006:1006
- 50070:50070
@@ -20,7 +20,7 @@ services:
entrypoint: /etc/bootstrap.sh -d
hdfskerberos:
- image: yandex/clickhouse-kerberos-kdc:latest
+ image: yandex/clickhouse-kerberos-kdc:${DOCKER_KERBEROS_KDC_TAG}
hostname: hdfskerberos
volumes:
- ${KERBERIZED_HDFS_DIR}/secrets:/tmp/keytab
diff --git a/docs/en/engines/table-engines/integrations/hdfs.md b/docs/en/engines/table-engines/integrations/hdfs.md
index d621c4c30c7..e85982e2b85 100644
--- a/docs/en/engines/table-engines/integrations/hdfs.md
+++ b/docs/en/engines/table-engines/integrations/hdfs.md
@@ -183,12 +183,17 @@ hadoop\_kerberos\_keytab
hadoop\_kerberos\_principal
hadoop\_kerberos\_kinit\_command
+#### Limitations {#limitations}
+
+ * hadoop\_security\_kerberos\_ticket\_cache\_path can be global only, not user specific
+
## Kerberos support {#kerberos-support}
If hadoop\_security\_authentication parameter has value 'kerberos', ClickHouse authentifies via Kerberos facility.
Parameters [here](#clickhouse-extras) and hadoop\_security\_kerberos\_ticket\_cache\_path may be of help.
Note that due to libhdfs3 limitations only old-fashioned approach is supported,
-datanode communications are not secured by SASL. Use tests/integration/test\_storage\_kerberized\_hdfs/hdfs_configs/bootstrap.sh for reference.
+datanode communications are not secured by SASL (HADOOP\_SECURE\_DN\_USER is a reliable indicator of such
+security approach). Use tests/integration/test\_storage\_kerberized\_hdfs/hdfs_configs/bootstrap.sh for reference.
## Virtual Columns {#virtual-columns}
@@ -199,5 +204,4 @@ datanode communications are not secured by SASL. Use tests/integration/test\_sto
- [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns)
-
[Original article](https://clickhouse.tech/docs/en/operations/table_engines/hdfs/)
diff --git a/programs/server/config.xml b/programs/server/config.xml
index 9ea8c2e5b51..f41c346bbed 100644
--- a/programs/server/config.xml
+++ b/programs/server/config.xml
@@ -887,31 +887,6 @@
-->
- Uncomment to disable ClickHouse internal DNS caching.
- 1
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 6021065f937..46956181974 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -88,6 +88,10 @@ if (USE_AWS_S3)
add_headers_and_sources(dbms Disks/S3)
endif()
+if (USE_HDFS)
+ add_headers_and_sources(dbms Storages/HDFS)
+endif()
+
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
@@ -389,8 +393,8 @@ if (USE_GRPC)
endif()
if (USE_HDFS)
- target_link_libraries (clickhouse_common_io PUBLIC ${HDFS3_LIBRARY})
- target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR})
+ dbms_target_link_libraries(PRIVATE ${HDFS3_LIBRARY})
+ dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR})
endif()
if (USE_AWS_S3)
diff --git a/src/Core/BackgroundSchedulePool.h b/src/Core/BackgroundSchedulePool.h
index d9fe9af789c..092824c069a 100644
--- a/src/Core/BackgroundSchedulePool.h
+++ b/src/Core/BackgroundSchedulePool.h
@@ -148,11 +148,6 @@ using BackgroundSchedulePoolTaskInfoPtr = std::shared_ptr;
-
- CleanupFunc cleanup_func;
-
-
BackgroundSchedulePoolTaskHolder() = default;
explicit BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskInfoPtr & task_info_) : task_info(task_info_) {}
BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskHolder & other) = delete;
@@ -164,8 +159,6 @@ public:
{
if (task_info)
task_info->deactivate();
- if (cleanup_func)
- cleanup_func();
}
operator bool() const { return task_info != nullptr; }
@@ -173,8 +166,6 @@ public:
BackgroundSchedulePoolTaskInfo * operator->() { return task_info.get(); }
const BackgroundSchedulePoolTaskInfo * operator->() const { return task_info.get(); }
- void setCleanupFunc(const CleanupFunc function) {cleanup_func = function;}
-
private:
BackgroundSchedulePoolTaskInfoPtr task_info;
};
diff --git a/src/IO/HDFSCommon.cpp b/src/Storages/HDFS/HDFSCommon.cpp
similarity index 61%
rename from src/IO/HDFSCommon.cpp
rename to src/Storages/HDFS/HDFSCommon.cpp
index a68b4a96a99..72004f9b7d0 100644
--- a/src/IO/HDFSCommon.cpp
+++ b/src/Storages/HDFS/HDFSCommon.cpp
@@ -1,12 +1,12 @@
-#include
+#include
#include
-#include
#include
-#include
-
#if USE_HDFS
+#include
#include
+#include
+#include
namespace DB
{
@@ -19,25 +19,9 @@ extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs";
-// void HDFSBuilderWrapper::makeCachePath(const String & cachePath, String user)
-// {
-// if (hadoop_security_kerberos_ticket_cache_path.empty())
-// {
-// hadoop_security_kerberos_ticket_cache_path = cachePath + "KRB5CACHEPATH" + user;
-// hdfsBuilderSetKerbTicketCachePath(hdfs_builder, hadoop_security_kerberos_ticket_cache_path.c_str());
-// }
-// }
-
void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration & config,
const String & config_path, bool isUser)
{
- hdfsBuilderConfSetStr(hdfs_builder, "input.read.timeout", "60000"); // 1 min
- hdfsBuilderConfSetStr(hdfs_builder, "input.write.timeout", "60000"); // 1 min
- hdfsBuilderConfSetStr(hdfs_builder, "input.connect.timeout", "60000"); // 1 min
-
- // hdfsBuilderConfSetStr(rawBuilder, "hadoop.security.authentication", "kerberos");
- // hdfsBuilderConfSetStr(rawBuilder, "dfs.client.log.severity", "TRACE");
-
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_path, keys);
@@ -85,20 +69,17 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration
const auto & [k,v] = keep(key_name, config.getString(key_path));
hdfsBuilderConfSetStr(hdfs_builder, k.c_str(), v.c_str());
-
}
}
String HDFSBuilderWrapper::getKinitCmd()
{
- std::stringstream ss;
-<<<<<<< HEAD
+ WriteBufferFromOwnString ss;
String cache_name = hadoop_security_kerberos_ticket_cache_path.empty() ?
String() :
(String(" -c \"") + hadoop_security_kerberos_ticket_cache_path + "\"");
-
ss << hadoop_kerberos_kinit_command << cache_name <<
" -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal <<
"|| " << hadoop_kerberos_kinit_command << cache_name << " -t \"" <<
@@ -113,21 +94,14 @@ void HDFSBuilderWrapper::runKinit()
std::unique_lock lck(kinit_mtx);
- int ret = system(cmd.c_str());
- if (ret)
- { // check it works !!
- throw Exception("kinit failure: " + std::to_string(ret) + " " + cmd, ErrorCodes::NETWORK_ERROR);
+ auto command = ShellCommand::execute(cmd);
+ auto status = command->tryWait();
+ if (status)
+ {
+ throw Exception("kinit failure: " + cmd, ErrorCodes::BAD_ARGUMENTS);
}
}
-
-=======
- ss << "kinit -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal <<
- "|| kinit -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal;
- return ss.str();
-}
-
->>>>>>> kerberized hdfs compiled
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & context)
{
const Poco::URI uri(uri_str);
@@ -142,46 +116,15 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & con
throw Exception("Unable to create builder to connect to HDFS: " +
uri.toString() + " " + String(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
- // hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
- // hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
- // hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
-
- // hdfsBuilderConfSetStr(builder.get(), "hadoop.security.authentication", "kerberos");
- // hdfsBuilderConfSetStr(builder.get(), "dfs.client.log.severity", "TRACE");
-
- const auto & config = context.getConfigRef();
- if (config.has(HDFSBuilderWrapper::CONFIG_PREFIX))
- {
- builder.loadFromConfig(config, HDFSBuilderWrapper::CONFIG_PREFIX);
- if (builder.needKinit)
- {
- String cmd = builder.getKinitCmd();
- int ret = system(cmd.c_str());
- if (ret)
- {
- throw Exception("kinit failure: " + std::to_string(ret) + " " + cmd, ErrorCodes::NETWORK_ERROR);
- }
- }
- }
-
-<<<<<<< HEAD
-
- // hdfsBuilderConfSetStr(builder.get(), "hadoop.security.authentication", "kerberos");
- // hdfsBuilderConfSetStr(builder.get(), "dfs.client.log.severity", "TRACE");
-
- const auto & config = context.getConfigRef();
+ hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
+ hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
+ hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
String user_info = uri.getUserInfo();
String user;
if (!user_info.empty() && user_info.front() != ':')
{
-=======
- String user_info = uri.getUserInfo();
- if (!user_info.empty() && user_info.front() != ':')
- {
- String user;
->>>>>>> kerberized hdfs compiled
size_t delim_pos = user_info.find(':');
if (delim_pos != String::npos)
user = user_info.substr(0, delim_pos);
@@ -196,11 +139,11 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & con
hdfsBuilderSetNameNodePort(builder.get(), port);
}
-
+ // const auto & config = context.getGlobalContext().getConfigRef();
+ const auto & config = context.getConfigRef();
if (config.has(HDFSBuilderWrapper::CONFIG_PREFIX))
{
builder.loadFromConfig(config, HDFSBuilderWrapper::CONFIG_PREFIX);
- // builder.makeCachePath(context.getUserFilesPath());
}
if (!user.empty())
@@ -211,17 +154,11 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & con
#if USE_INTERNAL_HDFS3_LIBRARY
builder.loadFromConfig(config, user_config_prefix, true);
#else
- throw Exception("Multi user HDFS configuration required internal libhdfs3",
- ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
+ throw Exception("Multi user HDFS configuration required internal libhdfs3",
+ ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);
#endif
}
- // builder.makeCachePath(context.getUserFilesPath(), user);
}
- // else
- // {
- // builder.makeCachePath(context.getUserFilesPath());
- // }
-
if (builder.needKinit)
{
diff --git a/src/IO/HDFSCommon.h b/src/Storages/HDFS/HDFSCommon.h
similarity index 60%
rename from src/IO/HDFSCommon.h
rename to src/Storages/HDFS/HDFSCommon.h
index bd49c5ebd3e..92fdb4f6843 100644
--- a/src/IO/HDFSCommon.h
+++ b/src/Storages/HDFS/HDFSCommon.h
@@ -1,74 +1,31 @@
#pragma once
+
#include
+
+#if USE_HDFS
#include
#include
#include
-#include
-#if USE_HDFS
#include
-#include
+#include
+#include
+
+#include
+#include
+
namespace DB
{
namespace detail
{
-/* struct HDFSBuilderDeleter */
-/* { */
-/* void operator()(hdfsBuilder * builder_ptr) */
-/* { */
-/* hdfsFreeBuilder(builder_ptr); */
-/* } */
-/* }; */
-struct HDFSFsDeleter
-{
- void operator()(hdfsFS fs_ptr)
+ struct HDFSFsDeleter
{
- hdfsDisconnect(fs_ptr);
- }
-};
-
-
-
-#if 0
-
-class KinitTaskHolder
-{
- using Container = std::map;
- Container container;
-
-
- String make_key(const HDFSBuilderWrapper & hbw)
- {
- return hbw.hadoop_kerberos_keytab + "^"
- + hbw.hadoop_kerberos_principal + "^"
- + std::to_string(time_relogin);
- }
-
-public:
- using Descriptor = Container::iterator;
-
- Descriptor addTask(const HDFSBuilderWrapper & hdfs_builder_wrapper)
- {
- auto key = make_key(hdfs_builder_wrapper);
-
- auto it = container.find(key);
- if ( it != std::end(container))
+ void operator()(hdfsFS fs_ptr)
{
- it = container.insert({key, task}).first;
+ hdfsDisconnect(fs_ptr);
}
-
- return it.second->getptr();
-
- }
- void delTask(Descriptor it)
- {
- container.erase(it);
- }
-};
-
-#endif
-
+ };
}
struct HDFSFileInfo
@@ -92,26 +49,25 @@ struct HDFSFileInfo
}
};
-
class HDFSBuilderWrapper
{
hdfsBuilder * hdfs_builder;
String hadoop_kerberos_keytab;
String hadoop_kerberos_principal;
String hadoop_kerberos_kinit_command = "kinit";
- String hadoop_security_kerberos_ticket_cache_path;
+ String hadoop_security_kerberos_ticket_cache_path;
static std::mutex kinit_mtx;
std::vector> config_stor;
+ // hdfs builder relies on an external config data storage
std::pair& keep(const String & k, const String & v)
{
return config_stor.emplace_back(std::make_pair(k, v));
}
- void loadFromConfig(const Poco::Util::AbstractConfiguration & config,
- const String & config_path, bool isUser = false);
+ void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser = false);
String getKinitCmd();
@@ -119,8 +75,6 @@ class HDFSBuilderWrapper
void runKinit();
- void makeCachePath(const String & cachePath, String user = "");
-
static const String CONFIG_PREFIX;
public:
@@ -148,10 +102,6 @@ public:
friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & context);
};
-
-
-
-/* using HDFSBuilderPtr = std::unique_ptr; */
using HDFSFSPtr = std::unique_ptr, detail::HDFSFsDeleter>;
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
diff --git a/src/IO/ReadBufferFromHDFS.cpp b/src/Storages/HDFS/ReadBufferFromHDFS.cpp
similarity index 93%
rename from src/IO/ReadBufferFromHDFS.cpp
rename to src/Storages/HDFS/ReadBufferFromHDFS.cpp
index 9caafda957c..20340136e3d 100644
--- a/src/IO/ReadBufferFromHDFS.cpp
+++ b/src/Storages/HDFS/ReadBufferFromHDFS.cpp
@@ -1,8 +1,7 @@
#include "ReadBufferFromHDFS.h"
#if USE_HDFS
-#include
-#include
+#include
#include
#include
@@ -28,11 +27,11 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
HDFSFSPtr fs;
explicit ReadBufferFromHDFSImpl(const std::string & hdfs_name_, const Context & context_)
- : hdfs_uri(hdfs_name_)
+ : hdfs_uri(hdfs_name_),
+ builder(createHDFSBuilder(hdfs_uri, context_))
{
std::lock_guard lock(hdfs_init_mutex);
- builder = createHDFSBuilder(hdfs_uri, context_);
fs = createHDFSFS(builder.get());
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const std::string path = hdfs_uri.substr(begin_of_path);
diff --git a/src/IO/ReadBufferFromHDFS.h b/src/Storages/HDFS/ReadBufferFromHDFS.h
similarity index 81%
rename from src/IO/ReadBufferFromHDFS.h
rename to src/Storages/HDFS/ReadBufferFromHDFS.h
index 1208406c7ee..ce71338a7fd 100644
--- a/src/IO/ReadBufferFromHDFS.h
+++ b/src/Storages/HDFS/ReadBufferFromHDFS.h
@@ -4,11 +4,17 @@
#if USE_HDFS
#include
-#include
#include
#include
#include
+#include
+
+#include
+
+#include
+
+
namespace DB
{
/** Accepts HDFS path to file and opens it.
@@ -20,9 +26,6 @@ class ReadBufferFromHDFS : public BufferWithOwnMemory
std::unique_ptr impl;
public:
ReadBufferFromHDFS(const std::string & hdfs_name_, const Context & context, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
-
- // ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default;
- ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default;
~ReadBufferFromHDFS() override;
bool nextImpl() override;
diff --git a/src/Storages/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp
similarity index 98%
rename from src/Storages/StorageHDFS.cpp
rename to src/Storages/HDFS/StorageHDFS.cpp
index 49fa4444e0c..1f85388989e 100644
--- a/src/Storages/StorageHDFS.cpp
+++ b/src/Storages/HDFS/StorageHDFS.cpp
@@ -3,15 +3,15 @@
#if USE_HDFS
#include
-#include
+#include
#include
#include
#include
#include
-#include
-#include
+#include
+#include
#include
-#include
+#include
#include
#include
#include
@@ -176,7 +176,7 @@ public:
HDFSBlockOutputStream(const String & uri,
const String & format,
const Block & sample_block_,
- Context & context,
+ const Context & context,
const CompressionMethod compression_method)
: sample_block(sample_block_)
{
diff --git a/src/Storages/StorageHDFS.h b/src/Storages/HDFS/StorageHDFS.h
similarity index 100%
rename from src/Storages/StorageHDFS.h
rename to src/Storages/HDFS/StorageHDFS.h
diff --git a/src/IO/WriteBufferFromHDFS.cpp b/src/Storages/HDFS/WriteBufferFromHDFS.cpp
similarity index 85%
rename from src/IO/WriteBufferFromHDFS.cpp
rename to src/Storages/HDFS/WriteBufferFromHDFS.cpp
index f1c98885c72..b3684826128 100644
--- a/src/IO/WriteBufferFromHDFS.cpp
+++ b/src/Storages/HDFS/WriteBufferFromHDFS.cpp
@@ -3,8 +3,8 @@
#if USE_HDFS
#include
-#include
-#include
+#include
+#include
#include
@@ -37,9 +37,6 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
if (path.find_first_of("*?{") != std::string::npos)
throw Exception("URI '" + hdfs_uri + "' contains globs, so the table is in readonly mode", ErrorCodes::CANNOT_OPEN_FILE);
- // int flags = hdfsExists(fs.get(), path.c_str()) ? (O_WRONLY|O_SYNC) : (O_WRONLY|O_APPEND|O_SYNC); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
- // fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, 0, 1024*1024);
-
if (!hdfsExists(fs.get(), path.c_str()))
throw Exception("File: " + path + " is already exists", ErrorCodes::BAD_ARGUMENTS);
fout = hdfsOpenFile(fs.get(), path.c_str(), O_WRONLY, 0, 0, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
@@ -80,8 +77,6 @@ WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, const C
: BufferWithOwnMemory(buf_size)
, impl(std::make_unique(hdfs_name_, context))
{
- // auto modified_context = std::make_shared(context);
- // impl = std::make_unique(hdfs_name_, modified_context);
}
diff --git a/src/IO/WriteBufferFromHDFS.h b/src/Storages/HDFS/WriteBufferFromHDFS.h
similarity index 95%
rename from src/IO/WriteBufferFromHDFS.h
rename to src/Storages/HDFS/WriteBufferFromHDFS.h
index f633d28e6f3..a404935fd23 100644
--- a/src/IO/WriteBufferFromHDFS.h
+++ b/src/Storages/HDFS/WriteBufferFromHDFS.h
@@ -4,7 +4,6 @@
#if USE_HDFS
#include
-#include
#include
#include
#include
diff --git a/src/TableFunctions/TableFunctionHDFS.cpp b/src/TableFunctions/TableFunctionHDFS.cpp
index e2f227ef7b5..700cb93ca06 100644
--- a/src/TableFunctions/TableFunctionHDFS.cpp
+++ b/src/TableFunctions/TableFunctionHDFS.cpp
@@ -2,7 +2,7 @@
#include "registerTableFunctions.h"
#if USE_HDFS
-#include
+#include
#include
#include
#include
diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py
index c7dff0a816a..a65a420cd5b 100644
--- a/tests/integration/helpers/cluster.py
+++ b/tests/integration/helpers/cluster.py
@@ -169,7 +169,7 @@ class ClickHouseCluster:
cmd += " client"
return cmd
- def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries = None,
+ def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None,
macros=None,
with_zookeeper=False, with_mysql=False, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False,
clickhouse_path_dir=None,
@@ -321,7 +321,8 @@ class ClickHouseCluster:
self.with_kerberized_hdfs = True
self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')])
self.base_kerberized_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name',
- self.project_name, '--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')]
+ self.project_name, '--file',
+ p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')]
cmds.append(self.base_kerberized_hdfs_cmd)
if with_mongo and not self.with_mongo:
@@ -485,38 +486,32 @@ class ClickHouseCluster:
raise Exception("Cannot wait ZooKeeper container")
- def wait_hdfs_to_start(self, timeout=60, kerberized=False):
- start = time.time()
+ def make_hdfs_api(self, timeout=60, kerberized=False):
if kerberized:
keytab = p.abspath(p.join(self.instances['node1'].path, "secrets/clickhouse.keytab"))
- krb_conf = p.abspath(p.join(self.instances['node1'].path, "secrets/krb.conf"))
+ krb_conf = p.abspath(p.join(self.instances['node1'].path, "secrets/krb_long.conf"))
hdfs_ip = self.get_instance_ip('kerberizedhdfs1')
- print("kerberizedhdfs1 ip ", hdfs_ip)
+ # print("kerberizedhdfs1 ip ", hdfs_ip)
kdc_ip = self.get_instance_ip('hdfskerberos')
- print("kdc_ip ", kdc_ip)
+ # print("kdc_ip ", kdc_ip)
self.hdfs_api = HDFSApi(user="root",
timeout=timeout,
kerberized=True,
principal="root@TEST.CLICKHOUSE.TECH",
keytab=keytab,
krb_conf=krb_conf,
- # host="kerberizedhdfs1.test.clickhouse.tech",
host="kerberizedhdfs1",
protocol="http",
- # protocol="https",
proxy_port=50070,
- # proxy_port=50470,
- # data_port=50475,
data_port=1006,
hdfs_ip=hdfs_ip,
kdc_ip=kdc_ip)
- # self.hdfs_api = hdfs_api
else:
self.hdfs_api = HDFSApi(user="root", host="hdfs1")
- # time.sleep(150)
- # return
+ def wait_hdfs_to_start(self, timeout=60):
+ start = time.time()
while time.time() - start < timeout:
try:
self.hdfs_api.write_data("/somefilewithrandomname222", "1")
@@ -658,6 +653,7 @@ class ClickHouseCluster:
self.wait_schema_registry_to_start(120)
if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd:
+ print('Setup kerberized kafka')
env = os.environ.copy()
env['KERBERIZED_KAFKA_DIR'] = instance.path + '/'
subprocess.check_call(self.base_kerberized_kafka_cmd + common_opts + ['--renew-anon-volumes'], env=env)
@@ -669,21 +665,16 @@ class ClickHouseCluster:
if self.with_hdfs and self.base_hdfs_cmd:
print('Setup HDFS')
subprocess_check_call(self.base_hdfs_cmd + common_opts)
+ self.make_hdfs_api()
self.wait_hdfs_to_start(120)
if self.with_kerberized_hdfs and self.base_kerberized_hdfs_cmd:
print('Setup kerberized HDFS')
- env_var = {}
- env_var['KERBERIZED_HDFS_DIR'] = instance.path + '/'
-
- # different docker_compose versions look for .env in different places
- # -- env-file too recent to rely on it
- files_to_cleanup = []
- files_to_cleanup.append(_create_env_file(self.base_dir, env_var, ".env"))
- files_to_cleanup.append(_create_env_file(os.getcwd(), env_var, ".env"))
- subprocess.check_call(self.base_kerberized_hdfs_cmd + common_opts, env=env_var)
- self.wait_hdfs_to_start(kerberized=True, timeout=300)
- remove_files(files_to_cleanup)
+ env = os.environ.copy()
+ env['KERBERIZED_HDFS_DIR'] = instance.path + '/'
+ subprocess.check_call(self.base_kerberized_hdfs_cmd + common_opts, env=env)
+ self.make_hdfs_api(kerberized=True)
+ self.wait_hdfs_to_start(timeout=300)
if self.with_mongo and self.base_mongo_cmd:
print('Setup Mongo')
@@ -940,7 +931,7 @@ class ClickHouseInstance:
if with_kerberized_kafka or with_kerberized_hdfs:
self.keytab_path = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets:/tmp/keytab"
- self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb_ch.conf:/etc/krb5.conf:ro"
+ self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb.conf:/etc/krb5.conf:ro"
else:
self.keytab_path = ""
self.krb5_conf = ""
diff --git a/tests/integration/helpers/hdfs_api.py b/tests/integration/helpers/hdfs_api.py
index 79dc72003a5..cb742662855 100644
--- a/tests/integration/helpers/hdfs_api.py
+++ b/tests/integration/helpers/hdfs_api.py
@@ -4,6 +4,7 @@ import gzip
import subprocess
import time
from tempfile import NamedTemporaryFile
+import requests
import requests_kerberos as reqkerb
import socket
import tempfile
@@ -13,9 +14,9 @@ import os
g_dns_hook = None
def custom_getaddrinfo(*args):
- print("from custom_getaddrinfo g_dns_hook is None ", g_dns_hook is None)
+ # print("from custom_getaddrinfo g_dns_hook is None ", g_dns_hook is None)
ret = g_dns_hook.custom_getaddrinfo(*args)
- print("g_dns_hook.custom_getaddrinfo result", ret)
+ # print("g_dns_hook.custom_getaddrinfo result", ret)
return ret
@@ -28,7 +29,7 @@ class mk_krb_conf(object):
with open(self.krb_conf) as f:
content = f.read()
amended_content = content.replace('hdfskerberos', self.kdc_ip)
- self.amended_krb_conf = tempfile.NamedTemporaryFile(delete=False)
+ self.amended_krb_conf = tempfile.NamedTemporaryFile(delete=False, mode="w+")
self.amended_krb_conf.write(amended_content)
self.amended_krb_conf.close()
return self.amended_krb_conf.name
@@ -36,38 +37,32 @@ class mk_krb_conf(object):
if self.amended_krb_conf is not None:
self.amended_krb_conf.close()
-
+# tweak dns resolution to connect to localhost where api_host is in URL
class dns_hook(object):
def __init__(self, hdfs_api):
- print("dns_hook.init ", hdfs_api.kerberized, hdfs_api.host, hdfs_api.data_port, hdfs_api.proxy_port)
+ # print("dns_hook.init ", hdfs_api.kerberized, hdfs_api.host, hdfs_api.data_port, hdfs_api.proxy_port)
self.hdfs_api = hdfs_api
def __enter__(self):
global g_dns_hook
g_dns_hook = self
- if True: # self.hdfs_api.kerberized:
- print("g_dns_hook is None ", g_dns_hook is None)
- self.original_getaddrinfo = socket.getaddrinfo
- socket.getaddrinfo = custom_getaddrinfo
- return self
+ # print("g_dns_hook is None ", g_dns_hook is None)
+ self.original_getaddrinfo = socket.getaddrinfo
+ socket.getaddrinfo = custom_getaddrinfo
+ return self
def __exit__(self, type, value, traceback):
global g_dns_hook
g_dns_hook = None
- if True: # self.hdfs_api.kerberized:
- socket.getaddrinfo = self.original_getaddrinfo
+ socket.getaddrinfo = self.original_getaddrinfo
def custom_getaddrinfo(self, *args):
(hostname, port) = args[:2]
- print("top of custom_getaddrinfo", hostname, port)
+ # print("top of custom_getaddrinfo", hostname, port)
if hostname == self.hdfs_api.host and (port == self.hdfs_api.data_port or port == self.hdfs_api.proxy_port):
- print("dns_hook substitute")
- return [(socket.AF_INET, 1, 6, '', ("127.0.0.1", port))] #self.hdfs_api.hdfs_ip
+ # print("dns_hook substitute")
+ return [(socket.AF_INET, 1, 6, '', ("127.0.0.1", port))]
else:
return self.original_getaddrinfo(*args)
-
-import requests
-
-
class HDFSApi(object):
def __init__(self, user, timeout=100, kerberized=False, principal=None,
keytab=None, krb_conf=None,
@@ -86,14 +81,11 @@ class HDFSApi(object):
self.kdc_ip = kdc_ip
self.krb_conf = krb_conf
- logging.basicConfig(level=logging.DEBUG)
- logging.getLogger().setLevel(logging.DEBUG)
- requests_log = logging.getLogger("requests.packages.urllib3")
- requests_log.setLevel(logging.DEBUG)
- requests_log.propagate = True
-
-
-
+ # logging.basicConfig(level=logging.DEBUG)
+ # logging.getLogger().setLevel(logging.DEBUG)
+ # requests_log = logging.getLogger("requests.packages.urllib3")
+ # requests_log.setLevel(logging.DEBUG)
+ # requests_log.propagate = True
if kerberized:
self._run_kinit()
@@ -109,23 +101,23 @@ class HDFSApi(object):
raise Exception("kerberos principal and keytab are required")
with mk_krb_conf(self.krb_conf, self.kdc_ip) as instantiated_krb_conf:
- print("instantiated_krb_conf ", instantiated_krb_conf)
+ # print("instantiated_krb_conf ", instantiated_krb_conf)
os.environ["KRB5_CONFIG"] = instantiated_krb_conf
cmd = "(kinit -R -t {keytab} -k {principal} || (sleep 5 && kinit -R -t {keytab} -k {principal})) ; klist".format(instantiated_krb_conf=instantiated_krb_conf, keytab=self.keytab, principal=self.principal)
- print(cmd)
+ # print(cmd)
start = time.time()
while time.time() - start < self.timeout:
try:
subprocess.call(cmd, shell=True)
- print "KDC started, kinit successfully run"
+ print("KDC started, kinit successfully run")
return
except Exception as ex:
- print "Can't run kinit ... waiting" + str(ex)
+ print("Can't run kinit ... waiting {}".format(str(ex)))
time.sleep(1)
raise Exception("Kinit running failure")
@@ -137,7 +129,7 @@ class HDFSApi(object):
response.raise_for_status()
# additional_params = '&'.join(response.headers['Location'].split('&')[1:2])
url = "{location}".format(location=response.headers['Location'])
- print("redirected to ", url)
+ # print("redirected to ", url)
with dns_hook(self):
response_data = requests.get(url,
headers={'host': 'localhost'},
@@ -149,20 +141,6 @@ class HDFSApi(object):
else:
return response_data.content
- # Requests can't put file
- def _curl_to_put(self, filename, path, params):
- url = "{protocol}://{host}:{port}/webhdfs/v1{path}?op=CREATE&{params}".format(protocol=self.protocol,
- host=self.host,
- port=self.data_port,
- path=path,
- params=params)
- if self.kerberized:
- cmd = "curl -k --negotiate -s -i -X PUT -T {fname} -u : '{url}' --resolve {host}:{port}:127.0.0.1".format(fname=filename, url=url)
- else:
- cmd = "curl -s -i -X PUT -T {fname} '{url}'".format(fname=filename, url=url)
- output = subprocess.check_output(cmd, shell=True)
- return output
-
def write_data(self, path, content):
named_file = NamedTemporaryFile(mode='wb+')
fpath = named_file.name
@@ -173,12 +151,9 @@ class HDFSApi(object):
if self.kerberized:
- print("before request.put", os.environ["KRB5_CONFIG"])
self._run_kinit()
- # cmd = "klist"
- # subprocess.call(cmd, shell=True)
self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal)
- print(self.kerberos_auth)
+ # print(self.kerberos_auth)
with dns_hook(self):
response = requests.put(
@@ -190,34 +165,26 @@ class HDFSApi(object):
params={'overwrite' : 'true'},
verify=False, auth=self.kerberos_auth
)
- print("after request.put", response.status_code)
if response.status_code != 307:
- print(response.headers)
+ # print(response.headers)
response.raise_for_status()
- print("after status code check")
-
additional_params = '&'.join(
response.headers['Location'].split('&')[1:2] + ["user.name={}".format(self.user), "overwrite=true"])
- if False: #not self.kerberized:
- output = self._curl_to_put(fpath, path, additional_params)
- if "201 Created" not in output:
- raise Exception("Can't create file on hdfs:\n {}".format(output))
- else:
- with dns_hook(self), open(fpath) as fh:
- file_data = fh.read()
- protocol = "http" # self.protocol
- response = requests.put(
- "{location}".format(location=response.headers['Location']),
- data=file_data,
- headers={'content-type':'text/plain', 'host': 'localhost'},
- params={'file': path, 'user.name' : self.user},
- allow_redirects=False, verify=False, auth=self.kerberos_auth
- )
- print(response)
- if response.status_code != 201:
- response.raise_for_status()
+ with dns_hook(self), open(fpath, mode="rb") as fh:
+ file_data = fh.read()
+ protocol = "http" # self.protocol
+ response = requests.put(
+ "{location}".format(location=response.headers['Location']),
+ data=file_data,
+ headers={'content-type':'text/plain', 'host': 'localhost'},
+ params={'file': path, 'user.name' : self.user},
+ allow_redirects=False, verify=False, auth=self.kerberos_auth
+ )
+ # print(response)
+ if response.status_code != 201:
+ response.raise_for_status()
def write_gzip_data(self, path, content):
diff --git a/tests/integration/test_allowed_url_from_config/test.py b/tests/integration/test_allowed_url_from_config/test.py
index 44715d92121..6442937c8f4 100644
--- a/tests/integration/test_allowed_url_from_config/test.py
+++ b/tests/integration/test_allowed_url_from_config/test.py
@@ -1,6 +1,5 @@
import pytest
from helpers.cluster import ClickHouseCluster
-from helpers.hdfs_api import HDFSApi
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', main_configs=['configs/config_with_hosts.xml'])
@@ -101,9 +100,8 @@ def test_table_function_remote(start_cluster):
def test_redirect(start_cluster):
- hdfs_api = HDFSApi("root")
- hdfs_api.write_data("/simple_storage", "1\t\n")
- assert hdfs_api.read_data("/simple_storage") == "1\t\n"
+ start_cluster.hdfs_api.write_data("/simple_storage", "1\t\n")
+ assert start_cluster.hdfs_api.read_data("/simple_storage") == "1\t\n"
node7.query(
"CREATE TABLE table_test_7_1 (word String) ENGINE=URL('http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', CSV)")
assert "not allowed" in node7.query_and_get_error("SET max_http_get_redirects=1; SELECT * from table_test_7_1")
diff --git a/tests/integration/test_redirect_url_storage/test.py b/tests/integration/test_redirect_url_storage/test.py
index f93548af0db..f2731794d43 100644
--- a/tests/integration/test_redirect_url_storage/test.py
+++ b/tests/integration/test_redirect_url_storage/test.py
@@ -17,9 +17,8 @@ def started_cluster():
def test_url_without_redirect(started_cluster):
- hdfs_api = HDFSApi("root")
- hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
- assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
+ started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
+ assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access datanode port directly
node1.query(
@@ -28,9 +27,8 @@ def test_url_without_redirect(started_cluster):
def test_url_with_redirect_not_allowed(started_cluster):
- hdfs_api = HDFSApi("root")
- hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
- assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
+ started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
+ assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access proxy port without allowing redirects
node1.query(
@@ -40,9 +38,8 @@ def test_url_with_redirect_not_allowed(started_cluster):
def test_url_with_redirect_allowed(started_cluster):
- hdfs_api = HDFSApi("root")
- hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
- assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
+ started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n")
+ assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n"
# access proxy port with allowing redirects
# http://localhost:50070/webhdfs/v1/b?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0
diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py
index eebc52a01a3..a6c8b7e1ee9 100644
--- a/tests/integration/test_storage_hdfs/test.py
+++ b/tests/integration/test_storage_hdfs/test.py
@@ -203,5 +203,5 @@ def test_write_gzip_storage(started_cluster):
if __name__ == '__main__':
cluster.start()
- raw_input("Cluster created, press any key to destroy...")
+ input("Cluster created, press any key to destroy...")
cluster.shutdown()
diff --git a/tests/integration/test_storage_kerberized_hdfs/secrets/krb.conf b/tests/integration/test_storage_kerberized_hdfs/secrets/krb.conf
index 98cc1e6810a..2c1e6f15f77 100644
--- a/tests/integration/test_storage_kerberized_hdfs/secrets/krb.conf
+++ b/tests/integration/test_storage_kerberized_hdfs/secrets/krb.conf
@@ -7,8 +7,7 @@
default_realm = TEST.CLICKHOUSE.TECH
dns_lookup_realm = false
dns_lookup_kdc = false
- ticket_lifetime = 15d
- # renew_lifetime = 15d
+ ticket_lifetime = 15s
forwardable = true
default_tgs_enctypes = des3-hmac-sha1
default_tkt_enctypes = des3-hmac-sha1
diff --git a/tests/integration/test_storage_kerberized_hdfs/secrets/krb_ch.conf b/tests/integration/test_storage_kerberized_hdfs/secrets/krb_long.conf
similarity index 92%
rename from tests/integration/test_storage_kerberized_hdfs/secrets/krb_ch.conf
rename to tests/integration/test_storage_kerberized_hdfs/secrets/krb_long.conf
index 850e9622109..ec1c54240e8 100644
--- a/tests/integration/test_storage_kerberized_hdfs/secrets/krb_ch.conf
+++ b/tests/integration/test_storage_kerberized_hdfs/secrets/krb_long.conf
@@ -7,8 +7,7 @@
default_realm = TEST.CLICKHOUSE.TECH
dns_lookup_realm = false
dns_lookup_kdc = false
- ticket_lifetime = 15s
- # renew_lifetime = 15d
+ ticket_lifetime = 15d
forwardable = true
default_tgs_enctypes = des3-hmac-sha1
default_tkt_enctypes = des3-hmac-sha1
diff --git a/tests/integration/test_storage_kerberized_hdfs/test.py b/tests/integration/test_storage_kerberized_hdfs/test.py
index 9e9412a99ed..b5b330f7c78 100644
--- a/tests/integration/test_storage_kerberized_hdfs/test.py
+++ b/tests/integration/test_storage_kerberized_hdfs/test.py
@@ -6,7 +6,6 @@ import os
from helpers.cluster import ClickHouseCluster
import subprocess
-
cluster = ClickHouseCluster(__file__)
node1 = cluster.add_instance('node1', with_kerberized_hdfs=True, user_configs=[], main_configs=['configs/log_conf.xml', 'configs/hdfs.xml'])
@@ -24,59 +23,40 @@ def started_cluster():
cluster.shutdown()
def test_read_table(started_cluster):
- # hdfs_api = HDFSApi("root")
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
started_cluster.hdfs_api.write_data("/simple_table_function", data)
api_read = started_cluster.hdfs_api.read_data("/simple_table_function")
- print("api_read", api_read)
-
assert api_read == data
select_read = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')")
- print("select_read", select_read)
-
assert select_read == data
def test_read_write_storage(started_cluster):
- # node1.query("create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberized_hdfs1.test.clickhouse.tech:9000/simple_storage', 'TSV')")
node1.query("create table SimpleHDFSStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage1', 'TSV')")
node1.query("insert into SimpleHDFSStorage2 values (1, 'Mark', 72.53)")
api_read = started_cluster.hdfs_api.read_data("/simple_storage1")
- print("api_read", api_read)
assert api_read == "1\tMark\t72.53\n"
select_read = node1.query("select * from SimpleHDFSStorage2")
- print("select_read", select_read)
assert select_read == "1\tMark\t72.53\n"
-def test_write_storage_expired(started_cluster):
- node1.query("create table SimpleHDFSStorageExpired (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage_expired', 'TSV')")
+def test_write_storage_not_expired(started_cluster):
+ node1.query("create table SimpleHDFSStorageNotExpired (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage_not_expired', 'TSV')")
time.sleep(45) # wait for ticket expiration
- node1.query("insert into SimpleHDFSStorageExpired values (1, 'Mark', 72.53)")
+ node1.query("insert into SimpleHDFSStorageNotExpired values (1, 'Mark', 72.53)")
- api_read = started_cluster.hdfs_api.read_data("/simple_storage_expired")
- print("api_read", api_read)
+ api_read = started_cluster.hdfs_api.read_data("/simple_storage_not_expired")
assert api_read == "1\tMark\t72.53\n"
- select_read = node1.query("select * from SimpleHDFSStorageExpired")
- print("select_read", select_read)
+ select_read = node1.query("select * from SimpleHDFSStorageNotExpired")
assert select_read == "1\tMark\t72.53\n"
-def test_prohibited(started_cluster):
- node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9000/storage_user_two_prohibited', 'TSV')")
- try:
- node1.query("insert into HDFSStorTwoProhibited values (1, 'SomeOne', 74.00)")
- assert False, "Exception have to be thrown"
- except Exception as ex:
- assert "Unable to open HDFS file: /storage_user_two_prohibited error: Permission denied: user=specuser, access=WRITE" in str(ex)
-
-
def test_two_users(started_cluster):
node1.query("create table HDFSStorOne (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/storage_user_one', 'TSV')")
node1.query("insert into HDFSStorOne values (1, 'IlyaReal', 86.00)")
@@ -85,31 +65,10 @@ def test_two_users(started_cluster):
node1.query("insert into HDFSStorTwo values (1, 'IlyaIdeal', 74.00)")
select_read_1 = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV', 'id UInt64, text String, number Float64')")
- print("select_read_1", select_read_1)
select_read_2 = node1.query("select * from hdfs('hdfs://suser@kerberizedhdfs1:9000/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')")
- print("select_read_2", select_read_2)
- # node1.query("create table HDFSStorTwo_ (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV')")
- # try:
- # node1.query("insert into HDFSStorTwo_ values (1, 'AnotherPerspn', 88.54)")
- # assert False, "Exception have to be thrown"
- # except Exception as ex:
- # print ex
- # assert "DB::Exception: Unable to open HDFS file: /user/specuser/storage_user_two error: Permission denied: user=root, access=WRITE, inode=\"/user/specuser/storage_user_two\":specuser:supergroup:drwxr-xr-x" in str(ex)
-
-
-def test_cache_path(started_cluster):
- node1.query("create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9000/storage_dedicated_cache_path', 'TSV')")
- try:
- node1.query("insert into HDFSStorCachePath values (1, 'FatMark', 92.53)")
- assert False, "Exception have to be thrown"
- except Exception as ex:
- assert "DB::Exception: hadoop.security.kerberos.ticket.cache.path cannot be set per user" in str(ex)
-
-
-
-def test_read_table_not_expired(started_cluster):
+def test_read_table_expired(started_cluster):
data = "1\tSerialize\t555.222\n2\tData\t777.333\n"
started_cluster.hdfs_api.write_data("/simple_table_function_relogin", data)
@@ -125,13 +84,25 @@ def test_read_table_not_expired(started_cluster):
started_cluster.unpause_container('hdfskerberos')
+def test_prohibited(started_cluster):
+ node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9000/storage_user_two_prohibited', 'TSV')")
+ try:
+ node1.query("insert into HDFSStorTwoProhibited values (1, 'SomeOne', 74.00)")
+ assert False, "Exception have to be thrown"
+ except Exception as ex:
+ assert "Unable to open HDFS file: /storage_user_two_prohibited error: Permission denied: user=specuser, access=WRITE" in str(ex)
-@pytest.mark.timeout(999999)
-def _test_sleep_forever(started_cluster):
- time.sleep(999999)
+
+def test_cache_path(started_cluster):
+ node1.query("create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9000/storage_dedicated_cache_path', 'TSV')")
+ try:
+ node1.query("insert into HDFSStorCachePath values (1, 'FatMark', 92.53)")
+ assert False, "Exception have to be thrown"
+ except Exception as ex:
+ assert "DB::Exception: hadoop.security.kerberos.ticket.cache.path cannot be set per user" in str(ex)
if __name__ == '__main__':
cluster.start()
- raw_input("Cluster created, press any key to destroy...")
+ input("Cluster created, press any key to destroy...")
cluster.shutdown()