From 7e9781485900af0240af7eabf69d7f2b537d4a78 Mon Sep 17 00:00:00 2001 From: Ilya Golshtein Date: Fri, 30 Oct 2020 22:40:16 +0300 Subject: [PATCH] cleanup, fixes, new submodules, ShellCommand, WriteBufferFromString --- contrib/libgsasl | 2 +- contrib/libhdfs3 | 2 +- contrib/libhdfs3-cmake/CMakeLists.txt | 10 +- .../integration/kerberized_hadoop/Dockerfile | 10 ++ docker/test/integration/runner/Dockerfile | 5 +- .../docker_compose_kerberized_hdfs.yml | 6 +- .../table-engines/integrations/hdfs.md | 8 +- programs/server/config.xml | 29 +---- src/CMakeLists.txt | 8 +- src/Core/BackgroundSchedulePool.h | 9 -- src/{IO => Storages/HDFS}/HDFSCommon.cpp | 97 +++------------ src/{IO => Storages/HDFS}/HDFSCommon.h | 82 +++---------- .../HDFS}/ReadBufferFromHDFS.cpp | 7 +- .../HDFS}/ReadBufferFromHDFS.h | 11 +- src/Storages/{ => HDFS}/StorageHDFS.cpp | 10 +- src/Storages/{ => HDFS}/StorageHDFS.h | 0 .../HDFS}/WriteBufferFromHDFS.cpp | 9 +- .../HDFS}/WriteBufferFromHDFS.h | 1 - src/TableFunctions/TableFunctionHDFS.cpp | 2 +- tests/integration/helpers/cluster.py | 43 +++---- tests/integration/helpers/hdfs_api.py | 111 ++++++------------ .../test_allowed_url_from_config/test.py | 6 +- .../test_redirect_url_storage/test.py | 15 +-- tests/integration/test_storage_hdfs/test.py | 2 +- .../secrets/krb.conf | 3 +- .../secrets/{krb_ch.conf => krb_long.conf} | 3 +- .../test_storage_kerberized_hdfs/test.py | 73 ++++-------- 27 files changed, 179 insertions(+), 385 deletions(-) create mode 100644 docker/test/integration/kerberized_hadoop/Dockerfile rename src/{IO => Storages/HDFS}/HDFSCommon.cpp (61%) rename src/{IO => Storages/HDFS}/HDFSCommon.h (60%) rename src/{IO => Storages/HDFS}/ReadBufferFromHDFS.cpp (93%) rename src/{IO => Storages/HDFS}/ReadBufferFromHDFS.h (81%) rename src/Storages/{ => HDFS}/StorageHDFS.cpp (98%) rename src/Storages/{ => HDFS}/StorageHDFS.h (100%) rename src/{IO => Storages/HDFS}/WriteBufferFromHDFS.cpp (85%) rename src/{IO => Storages/HDFS}/WriteBufferFromHDFS.h (95%) rename tests/integration/test_storage_kerberized_hdfs/secrets/{krb_ch.conf => krb_long.conf} (92%) diff --git a/contrib/libgsasl b/contrib/libgsasl index 140fb582505..383ee28e82f 160000 --- a/contrib/libgsasl +++ b/contrib/libgsasl @@ -1 +1 @@ -Subproject commit 140fb58250588c8323285b75fcf127c4adc33dfa +Subproject commit 383ee28e82f69fa16ed43b48bd9c8ee5b313ab84 diff --git a/contrib/libhdfs3 b/contrib/libhdfs3 index 30552ac527f..095b9d48b40 160000 --- a/contrib/libhdfs3 +++ b/contrib/libhdfs3 @@ -1 +1 @@ -Subproject commit 30552ac527f2c14070d834e171493b2e7f662375 +Subproject commit 095b9d48b400abb72d967cb0539af13b1e3d90cf diff --git a/contrib/libhdfs3-cmake/CMakeLists.txt b/contrib/libhdfs3-cmake/CMakeLists.txt index e1129a4bd59..49b35d09431 100644 --- a/contrib/libhdfs3-cmake/CMakeLists.txt +++ b/contrib/libhdfs3-cmake/CMakeLists.txt @@ -33,6 +33,11 @@ set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake" ${CMAKE_MODULE_PATH}) include(Platform) include(Options) +# # prefer shared libraries +# if (WITH_KERBEROS) +# find_package(KERBEROS REQUIRED) +# endif() + # source set(PROTO_FILES #${HDFS3_SOURCE_DIR}/proto/encryption.proto @@ -207,14 +212,11 @@ target_include_directories(hdfs3 PRIVATE ${HDFS3_COMMON_DIR}) target_include_directories(hdfs3 PRIVATE ${CMAKE_CURRENT_BINARY_DIR}) target_include_directories(hdfs3 PRIVATE ${LIBGSASL_INCLUDE_DIR}) -# if (WITH_KERBEROS) -# target_include_directories(hdfs3 PRIVATE ${KERBEROS_INCLUDE_DIRS}) -# endif() target_include_directories(hdfs3 PRIVATE ${LIBXML2_INCLUDE_DIR}) target_link_libraries(hdfs3 PRIVATE ${LIBGSASL_LIBRARY}) if (WITH_KERBEROS) - target_link_libraries(hdfs3 PUBLIC ${KRB5_LIBRARY}) + target_link_libraries(hdfs3 PRIVATE ${KRB5_LIBRARY}) endif() target_link_libraries(hdfs3 PRIVATE ${LIBXML2_LIBRARIES}) diff --git a/docker/test/integration/kerberized_hadoop/Dockerfile b/docker/test/integration/kerberized_hadoop/Dockerfile new file mode 100644 index 00000000000..08beab91a56 --- /dev/null +++ b/docker/test/integration/kerberized_hadoop/Dockerfile @@ -0,0 +1,10 @@ +# docker build -t ilejn/kerberized-hadoop . +FROM sequenceiq/hadoop-docker:2.7.0 +RUN yum --quiet --assumeyes install krb5-workstation.x86_64 +RUN cd /tmp && \ + curl http://archive.apache.org/dist/commons/daemon/source/commons-daemon-1.0.15-src.tar.gz -o commons-daemon-1.0.15-src.tar.gz && \ + tar xzf commons-daemon-1.0.15-src.tar.gz && \ + cd commons-daemon-1.0.15-src/src/native/unix && \ + ./configure && \ + make && \ + cp ./jsvc /usr/local/hadoop/sbin diff --git a/docker/test/integration/runner/Dockerfile b/docker/test/integration/runner/Dockerfile index 36188fc4a63..9b51891ccf5 100644 --- a/docker/test/integration/runner/Dockerfile +++ b/docker/test/integration/runner/Dockerfile @@ -29,6 +29,8 @@ RUN apt-get update \ libcurl4-openssl-dev \ gdb \ software-properties-common \ + libkrb5-dev \ + krb5-user \ && rm -rf \ /var/lib/apt/lists/* \ /var/cache/debconf \ @@ -75,7 +77,8 @@ RUN python3 -m pip install \ pytest-timeout \ redis \ tzlocal \ - urllib3 + urllib3 \ + requests-kerberos COPY modprobe.sh /usr/local/bin/modprobe COPY dockerd-entrypoint.sh /usr/local/bin/ diff --git a/docker/test/integration/runner/compose/docker_compose_kerberized_hdfs.yml b/docker/test/integration/runner/compose/docker_compose_kerberized_hdfs.yml index 787d88880dd..de6bfa16785 100644 --- a/docker/test/integration/runner/compose/docker_compose_kerberized_hdfs.yml +++ b/docker/test/integration/runner/compose/docker_compose_kerberized_hdfs.yml @@ -3,14 +3,14 @@ version: '2.3' services: kerberizedhdfs1: cap_add: - - CAP_DAC_READ_SEARCH + - DAC_READ_SEARCH image: ilejn/kerberized-hadoop:latest hostname: kerberizedhdfs1 restart: always volumes: - ${KERBERIZED_HDFS_DIR}/../../hdfs_configs/bootstrap.sh:/etc/bootstrap.sh:ro - ${KERBERIZED_HDFS_DIR}/secrets:/usr/local/hadoop/etc/hadoop/conf - - ${KERBERIZED_HDFS_DIR}/secrets/krb.conf:/etc/krb5.conf:ro + - ${KERBERIZED_HDFS_DIR}/secrets/krb_long.conf:/etc/krb5.conf:ro ports: - 1006:1006 - 50070:50070 @@ -20,7 +20,7 @@ services: entrypoint: /etc/bootstrap.sh -d hdfskerberos: - image: yandex/clickhouse-kerberos-kdc:latest + image: yandex/clickhouse-kerberos-kdc:${DOCKER_KERBEROS_KDC_TAG} hostname: hdfskerberos volumes: - ${KERBERIZED_HDFS_DIR}/secrets:/tmp/keytab diff --git a/docs/en/engines/table-engines/integrations/hdfs.md b/docs/en/engines/table-engines/integrations/hdfs.md index d621c4c30c7..e85982e2b85 100644 --- a/docs/en/engines/table-engines/integrations/hdfs.md +++ b/docs/en/engines/table-engines/integrations/hdfs.md @@ -183,12 +183,17 @@ hadoop\_kerberos\_keytab hadoop\_kerberos\_principal hadoop\_kerberos\_kinit\_command +#### Limitations {#limitations} + + * hadoop\_security\_kerberos\_ticket\_cache\_path can be global only, not user specific + ## Kerberos support {#kerberos-support} If hadoop\_security\_authentication parameter has value 'kerberos', ClickHouse authentifies via Kerberos facility. Parameters [here](#clickhouse-extras) and hadoop\_security\_kerberos\_ticket\_cache\_path may be of help. Note that due to libhdfs3 limitations only old-fashioned approach is supported, -datanode communications are not secured by SASL. Use tests/integration/test\_storage\_kerberized\_hdfs/hdfs_configs/bootstrap.sh for reference. +datanode communications are not secured by SASL (HADOOP\_SECURE\_DN\_USER is a reliable indicator of such +security approach). Use tests/integration/test\_storage\_kerberized\_hdfs/hdfs_configs/bootstrap.sh for reference. ## Virtual Columns {#virtual-columns} @@ -199,5 +204,4 @@ datanode communications are not secured by SASL. Use tests/integration/test\_sto - [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns) - [Original article](https://clickhouse.tech/docs/en/operations/table_engines/hdfs/) diff --git a/programs/server/config.xml b/programs/server/config.xml index 9ea8c2e5b51..f41c346bbed 100644 --- a/programs/server/config.xml +++ b/programs/server/config.xml @@ -887,31 +887,6 @@ --> - Uncomment to disable ClickHouse internal DNS caching. - 1 - - - - - - - - - - - - - - - - - - - - - - - - - + + diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 6021065f937..46956181974 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -88,6 +88,10 @@ if (USE_AWS_S3) add_headers_and_sources(dbms Disks/S3) endif() +if (USE_HDFS) + add_headers_and_sources(dbms Storages/HDFS) +endif() + list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD}) list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON}) @@ -389,8 +393,8 @@ if (USE_GRPC) endif() if (USE_HDFS) - target_link_libraries (clickhouse_common_io PUBLIC ${HDFS3_LIBRARY}) - target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR}) + dbms_target_link_libraries(PRIVATE ${HDFS3_LIBRARY}) + dbms_target_include_directories (SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR}) endif() if (USE_AWS_S3) diff --git a/src/Core/BackgroundSchedulePool.h b/src/Core/BackgroundSchedulePool.h index d9fe9af789c..092824c069a 100644 --- a/src/Core/BackgroundSchedulePool.h +++ b/src/Core/BackgroundSchedulePool.h @@ -148,11 +148,6 @@ using BackgroundSchedulePoolTaskInfoPtr = std::shared_ptr; - - CleanupFunc cleanup_func; - - BackgroundSchedulePoolTaskHolder() = default; explicit BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskInfoPtr & task_info_) : task_info(task_info_) {} BackgroundSchedulePoolTaskHolder(const BackgroundSchedulePoolTaskHolder & other) = delete; @@ -164,8 +159,6 @@ public: { if (task_info) task_info->deactivate(); - if (cleanup_func) - cleanup_func(); } operator bool() const { return task_info != nullptr; } @@ -173,8 +166,6 @@ public: BackgroundSchedulePoolTaskInfo * operator->() { return task_info.get(); } const BackgroundSchedulePoolTaskInfo * operator->() const { return task_info.get(); } - void setCleanupFunc(const CleanupFunc function) {cleanup_func = function;} - private: BackgroundSchedulePoolTaskInfoPtr task_info; }; diff --git a/src/IO/HDFSCommon.cpp b/src/Storages/HDFS/HDFSCommon.cpp similarity index 61% rename from src/IO/HDFSCommon.cpp rename to src/Storages/HDFS/HDFSCommon.cpp index a68b4a96a99..72004f9b7d0 100644 --- a/src/IO/HDFSCommon.cpp +++ b/src/Storages/HDFS/HDFSCommon.cpp @@ -1,12 +1,12 @@ -#include +#include #include -#include #include -#include - #if USE_HDFS +#include #include +#include +#include namespace DB { @@ -19,25 +19,9 @@ extern const int EXCESSIVE_ELEMENT_IN_CONFIG; const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs"; -// void HDFSBuilderWrapper::makeCachePath(const String & cachePath, String user) -// { -// if (hadoop_security_kerberos_ticket_cache_path.empty()) -// { -// hadoop_security_kerberos_ticket_cache_path = cachePath + "KRB5CACHEPATH" + user; -// hdfsBuilderSetKerbTicketCachePath(hdfs_builder, hadoop_security_kerberos_ticket_cache_path.c_str()); -// } -// } - void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser) { - hdfsBuilderConfSetStr(hdfs_builder, "input.read.timeout", "60000"); // 1 min - hdfsBuilderConfSetStr(hdfs_builder, "input.write.timeout", "60000"); // 1 min - hdfsBuilderConfSetStr(hdfs_builder, "input.connect.timeout", "60000"); // 1 min - - // hdfsBuilderConfSetStr(rawBuilder, "hadoop.security.authentication", "kerberos"); - // hdfsBuilderConfSetStr(rawBuilder, "dfs.client.log.severity", "TRACE"); - Poco::Util::AbstractConfiguration::Keys keys; config.keys(config_path, keys); @@ -85,20 +69,17 @@ void HDFSBuilderWrapper::loadFromConfig(const Poco::Util::AbstractConfiguration const auto & [k,v] = keep(key_name, config.getString(key_path)); hdfsBuilderConfSetStr(hdfs_builder, k.c_str(), v.c_str()); - } } String HDFSBuilderWrapper::getKinitCmd() { - std::stringstream ss; -<<<<<<< HEAD + WriteBufferFromOwnString ss; String cache_name = hadoop_security_kerberos_ticket_cache_path.empty() ? String() : (String(" -c \"") + hadoop_security_kerberos_ticket_cache_path + "\""); - ss << hadoop_kerberos_kinit_command << cache_name << " -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal << "|| " << hadoop_kerberos_kinit_command << cache_name << " -t \"" << @@ -113,21 +94,14 @@ void HDFSBuilderWrapper::runKinit() std::unique_lock lck(kinit_mtx); - int ret = system(cmd.c_str()); - if (ret) - { // check it works !! - throw Exception("kinit failure: " + std::to_string(ret) + " " + cmd, ErrorCodes::NETWORK_ERROR); + auto command = ShellCommand::execute(cmd); + auto status = command->tryWait(); + if (status) + { + throw Exception("kinit failure: " + cmd, ErrorCodes::BAD_ARGUMENTS); } } - -======= - ss << "kinit -R -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal << - "|| kinit -t \"" << hadoop_kerberos_keytab << "\" -k " << hadoop_kerberos_principal; - return ss.str(); -} - ->>>>>>> kerberized hdfs compiled HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & context) { const Poco::URI uri(uri_str); @@ -142,46 +116,15 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & con throw Exception("Unable to create builder to connect to HDFS: " + uri.toString() + " " + String(hdfsGetLastError()), ErrorCodes::NETWORK_ERROR); - // hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min - // hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min - // hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min - - // hdfsBuilderConfSetStr(builder.get(), "hadoop.security.authentication", "kerberos"); - // hdfsBuilderConfSetStr(builder.get(), "dfs.client.log.severity", "TRACE"); - - const auto & config = context.getConfigRef(); - if (config.has(HDFSBuilderWrapper::CONFIG_PREFIX)) - { - builder.loadFromConfig(config, HDFSBuilderWrapper::CONFIG_PREFIX); - if (builder.needKinit) - { - String cmd = builder.getKinitCmd(); - int ret = system(cmd.c_str()); - if (ret) - { - throw Exception("kinit failure: " + std::to_string(ret) + " " + cmd, ErrorCodes::NETWORK_ERROR); - } - } - } - -<<<<<<< HEAD - - // hdfsBuilderConfSetStr(builder.get(), "hadoop.security.authentication", "kerberos"); - // hdfsBuilderConfSetStr(builder.get(), "dfs.client.log.severity", "TRACE"); - - const auto & config = context.getConfigRef(); + hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min + hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min + hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min String user_info = uri.getUserInfo(); String user; if (!user_info.empty() && user_info.front() != ':') { -======= - String user_info = uri.getUserInfo(); - if (!user_info.empty() && user_info.front() != ':') - { - String user; ->>>>>>> kerberized hdfs compiled size_t delim_pos = user_info.find(':'); if (delim_pos != String::npos) user = user_info.substr(0, delim_pos); @@ -196,11 +139,11 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & con hdfsBuilderSetNameNodePort(builder.get(), port); } - + // const auto & config = context.getGlobalContext().getConfigRef(); + const auto & config = context.getConfigRef(); if (config.has(HDFSBuilderWrapper::CONFIG_PREFIX)) { builder.loadFromConfig(config, HDFSBuilderWrapper::CONFIG_PREFIX); - // builder.makeCachePath(context.getUserFilesPath()); } if (!user.empty()) @@ -211,17 +154,11 @@ HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & con #if USE_INTERNAL_HDFS3_LIBRARY builder.loadFromConfig(config, user_config_prefix, true); #else - throw Exception("Multi user HDFS configuration required internal libhdfs3", - ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); + throw Exception("Multi user HDFS configuration required internal libhdfs3", + ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG); #endif } - // builder.makeCachePath(context.getUserFilesPath(), user); } - // else - // { - // builder.makeCachePath(context.getUserFilesPath()); - // } - if (builder.needKinit) { diff --git a/src/IO/HDFSCommon.h b/src/Storages/HDFS/HDFSCommon.h similarity index 60% rename from src/IO/HDFSCommon.h rename to src/Storages/HDFS/HDFSCommon.h index bd49c5ebd3e..92fdb4f6843 100644 --- a/src/IO/HDFSCommon.h +++ b/src/Storages/HDFS/HDFSCommon.h @@ -1,74 +1,31 @@ #pragma once + #include + +#if USE_HDFS #include #include #include -#include -#if USE_HDFS #include -#include +#include +#include + +#include +#include + namespace DB { namespace detail { -/* struct HDFSBuilderDeleter */ -/* { */ -/* void operator()(hdfsBuilder * builder_ptr) */ -/* { */ -/* hdfsFreeBuilder(builder_ptr); */ -/* } */ -/* }; */ -struct HDFSFsDeleter -{ - void operator()(hdfsFS fs_ptr) + struct HDFSFsDeleter { - hdfsDisconnect(fs_ptr); - } -}; - - - -#if 0 - -class KinitTaskHolder -{ - using Container = std::map; - Container container; - - - String make_key(const HDFSBuilderWrapper & hbw) - { - return hbw.hadoop_kerberos_keytab + "^" - + hbw.hadoop_kerberos_principal + "^" - + std::to_string(time_relogin); - } - -public: - using Descriptor = Container::iterator; - - Descriptor addTask(const HDFSBuilderWrapper & hdfs_builder_wrapper) - { - auto key = make_key(hdfs_builder_wrapper); - - auto it = container.find(key); - if ( it != std::end(container)) + void operator()(hdfsFS fs_ptr) { - it = container.insert({key, task}).first; + hdfsDisconnect(fs_ptr); } - - return it.second->getptr(); - - } - void delTask(Descriptor it) - { - container.erase(it); - } -}; - -#endif - + }; } struct HDFSFileInfo @@ -92,26 +49,25 @@ struct HDFSFileInfo } }; - class HDFSBuilderWrapper { hdfsBuilder * hdfs_builder; String hadoop_kerberos_keytab; String hadoop_kerberos_principal; String hadoop_kerberos_kinit_command = "kinit"; - String hadoop_security_kerberos_ticket_cache_path; + String hadoop_security_kerberos_ticket_cache_path; static std::mutex kinit_mtx; std::vector> config_stor; + // hdfs builder relies on an external config data storage std::pair& keep(const String & k, const String & v) { return config_stor.emplace_back(std::make_pair(k, v)); } - void loadFromConfig(const Poco::Util::AbstractConfiguration & config, - const String & config_path, bool isUser = false); + void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser = false); String getKinitCmd(); @@ -119,8 +75,6 @@ class HDFSBuilderWrapper void runKinit(); - void makeCachePath(const String & cachePath, String user = ""); - static const String CONFIG_PREFIX; public: @@ -148,10 +102,6 @@ public: friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Context & context); }; - - - -/* using HDFSBuilderPtr = std::unique_ptr; */ using HDFSFSPtr = std::unique_ptr, detail::HDFSFsDeleter>; // set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large diff --git a/src/IO/ReadBufferFromHDFS.cpp b/src/Storages/HDFS/ReadBufferFromHDFS.cpp similarity index 93% rename from src/IO/ReadBufferFromHDFS.cpp rename to src/Storages/HDFS/ReadBufferFromHDFS.cpp index 9caafda957c..20340136e3d 100644 --- a/src/IO/ReadBufferFromHDFS.cpp +++ b/src/Storages/HDFS/ReadBufferFromHDFS.cpp @@ -1,8 +1,7 @@ #include "ReadBufferFromHDFS.h" #if USE_HDFS -#include -#include +#include #include #include @@ -28,11 +27,11 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl HDFSFSPtr fs; explicit ReadBufferFromHDFSImpl(const std::string & hdfs_name_, const Context & context_) - : hdfs_uri(hdfs_name_) + : hdfs_uri(hdfs_name_), + builder(createHDFSBuilder(hdfs_uri, context_)) { std::lock_guard lock(hdfs_init_mutex); - builder = createHDFSBuilder(hdfs_uri, context_); fs = createHDFSFS(builder.get()); const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2); const std::string path = hdfs_uri.substr(begin_of_path); diff --git a/src/IO/ReadBufferFromHDFS.h b/src/Storages/HDFS/ReadBufferFromHDFS.h similarity index 81% rename from src/IO/ReadBufferFromHDFS.h rename to src/Storages/HDFS/ReadBufferFromHDFS.h index 1208406c7ee..ce71338a7fd 100644 --- a/src/IO/ReadBufferFromHDFS.h +++ b/src/Storages/HDFS/ReadBufferFromHDFS.h @@ -4,11 +4,17 @@ #if USE_HDFS #include -#include #include #include #include +#include + +#include + +#include + + namespace DB { /** Accepts HDFS path to file and opens it. @@ -20,9 +26,6 @@ class ReadBufferFromHDFS : public BufferWithOwnMemory std::unique_ptr impl; public: ReadBufferFromHDFS(const std::string & hdfs_name_, const Context & context, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); - - // ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default; - ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default; ~ReadBufferFromHDFS() override; bool nextImpl() override; diff --git a/src/Storages/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp similarity index 98% rename from src/Storages/StorageHDFS.cpp rename to src/Storages/HDFS/StorageHDFS.cpp index 49fa4444e0c..1f85388989e 100644 --- a/src/Storages/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -3,15 +3,15 @@ #if USE_HDFS #include -#include +#include #include #include #include #include -#include -#include +#include +#include #include -#include +#include #include #include #include @@ -176,7 +176,7 @@ public: HDFSBlockOutputStream(const String & uri, const String & format, const Block & sample_block_, - Context & context, + const Context & context, const CompressionMethod compression_method) : sample_block(sample_block_) { diff --git a/src/Storages/StorageHDFS.h b/src/Storages/HDFS/StorageHDFS.h similarity index 100% rename from src/Storages/StorageHDFS.h rename to src/Storages/HDFS/StorageHDFS.h diff --git a/src/IO/WriteBufferFromHDFS.cpp b/src/Storages/HDFS/WriteBufferFromHDFS.cpp similarity index 85% rename from src/IO/WriteBufferFromHDFS.cpp rename to src/Storages/HDFS/WriteBufferFromHDFS.cpp index f1c98885c72..b3684826128 100644 --- a/src/IO/WriteBufferFromHDFS.cpp +++ b/src/Storages/HDFS/WriteBufferFromHDFS.cpp @@ -3,8 +3,8 @@ #if USE_HDFS #include -#include -#include +#include +#include #include @@ -37,9 +37,6 @@ struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl if (path.find_first_of("*?{") != std::string::npos) throw Exception("URI '" + hdfs_uri + "' contains globs, so the table is in readonly mode", ErrorCodes::CANNOT_OPEN_FILE); - // int flags = hdfsExists(fs.get(), path.c_str()) ? (O_WRONLY|O_SYNC) : (O_WRONLY|O_APPEND|O_SYNC); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here - // fout = hdfsOpenFile(fs.get(), path.c_str(), flags, 0, 0, 1024*1024); - if (!hdfsExists(fs.get(), path.c_str())) throw Exception("File: " + path + " is already exists", ErrorCodes::BAD_ARGUMENTS); fout = hdfsOpenFile(fs.get(), path.c_str(), O_WRONLY, 0, 0, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here @@ -80,8 +77,6 @@ WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, const C : BufferWithOwnMemory(buf_size) , impl(std::make_unique(hdfs_name_, context)) { - // auto modified_context = std::make_shared(context); - // impl = std::make_unique(hdfs_name_, modified_context); } diff --git a/src/IO/WriteBufferFromHDFS.h b/src/Storages/HDFS/WriteBufferFromHDFS.h similarity index 95% rename from src/IO/WriteBufferFromHDFS.h rename to src/Storages/HDFS/WriteBufferFromHDFS.h index f633d28e6f3..a404935fd23 100644 --- a/src/IO/WriteBufferFromHDFS.h +++ b/src/Storages/HDFS/WriteBufferFromHDFS.h @@ -4,7 +4,6 @@ #if USE_HDFS #include -#include #include #include #include diff --git a/src/TableFunctions/TableFunctionHDFS.cpp b/src/TableFunctions/TableFunctionHDFS.cpp index e2f227ef7b5..700cb93ca06 100644 --- a/src/TableFunctions/TableFunctionHDFS.cpp +++ b/src/TableFunctions/TableFunctionHDFS.cpp @@ -2,7 +2,7 @@ #include "registerTableFunctions.h" #if USE_HDFS -#include +#include #include #include #include diff --git a/tests/integration/helpers/cluster.py b/tests/integration/helpers/cluster.py index c7dff0a816a..a65a420cd5b 100644 --- a/tests/integration/helpers/cluster.py +++ b/tests/integration/helpers/cluster.py @@ -169,7 +169,7 @@ class ClickHouseCluster: cmd += " client" return cmd - def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries = None, + def add_instance(self, name, base_config_dir=None, main_configs=None, user_configs=None, dictionaries=None, macros=None, with_zookeeper=False, with_mysql=False, with_kafka=False, with_kerberized_kafka=False, with_rabbitmq=False, clickhouse_path_dir=None, @@ -321,7 +321,8 @@ class ClickHouseCluster: self.with_kerberized_hdfs = True self.base_cmd.extend(['--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')]) self.base_kerberized_hdfs_cmd = ['docker-compose', '--project-directory', self.base_dir, '--project-name', - self.project_name, '--file', p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')] + self.project_name, '--file', + p.join(docker_compose_yml_dir, 'docker_compose_kerberized_hdfs.yml')] cmds.append(self.base_kerberized_hdfs_cmd) if with_mongo and not self.with_mongo: @@ -485,38 +486,32 @@ class ClickHouseCluster: raise Exception("Cannot wait ZooKeeper container") - def wait_hdfs_to_start(self, timeout=60, kerberized=False): - start = time.time() + def make_hdfs_api(self, timeout=60, kerberized=False): if kerberized: keytab = p.abspath(p.join(self.instances['node1'].path, "secrets/clickhouse.keytab")) - krb_conf = p.abspath(p.join(self.instances['node1'].path, "secrets/krb.conf")) + krb_conf = p.abspath(p.join(self.instances['node1'].path, "secrets/krb_long.conf")) hdfs_ip = self.get_instance_ip('kerberizedhdfs1') - print("kerberizedhdfs1 ip ", hdfs_ip) + # print("kerberizedhdfs1 ip ", hdfs_ip) kdc_ip = self.get_instance_ip('hdfskerberos') - print("kdc_ip ", kdc_ip) + # print("kdc_ip ", kdc_ip) self.hdfs_api = HDFSApi(user="root", timeout=timeout, kerberized=True, principal="root@TEST.CLICKHOUSE.TECH", keytab=keytab, krb_conf=krb_conf, - # host="kerberizedhdfs1.test.clickhouse.tech", host="kerberizedhdfs1", protocol="http", - # protocol="https", proxy_port=50070, - # proxy_port=50470, - # data_port=50475, data_port=1006, hdfs_ip=hdfs_ip, kdc_ip=kdc_ip) - # self.hdfs_api = hdfs_api else: self.hdfs_api = HDFSApi(user="root", host="hdfs1") - # time.sleep(150) - # return + def wait_hdfs_to_start(self, timeout=60): + start = time.time() while time.time() - start < timeout: try: self.hdfs_api.write_data("/somefilewithrandomname222", "1") @@ -658,6 +653,7 @@ class ClickHouseCluster: self.wait_schema_registry_to_start(120) if self.with_kerberized_kafka and self.base_kerberized_kafka_cmd: + print('Setup kerberized kafka') env = os.environ.copy() env['KERBERIZED_KAFKA_DIR'] = instance.path + '/' subprocess.check_call(self.base_kerberized_kafka_cmd + common_opts + ['--renew-anon-volumes'], env=env) @@ -669,21 +665,16 @@ class ClickHouseCluster: if self.with_hdfs and self.base_hdfs_cmd: print('Setup HDFS') subprocess_check_call(self.base_hdfs_cmd + common_opts) + self.make_hdfs_api() self.wait_hdfs_to_start(120) if self.with_kerberized_hdfs and self.base_kerberized_hdfs_cmd: print('Setup kerberized HDFS') - env_var = {} - env_var['KERBERIZED_HDFS_DIR'] = instance.path + '/' - - # different docker_compose versions look for .env in different places - # -- env-file too recent to rely on it - files_to_cleanup = [] - files_to_cleanup.append(_create_env_file(self.base_dir, env_var, ".env")) - files_to_cleanup.append(_create_env_file(os.getcwd(), env_var, ".env")) - subprocess.check_call(self.base_kerberized_hdfs_cmd + common_opts, env=env_var) - self.wait_hdfs_to_start(kerberized=True, timeout=300) - remove_files(files_to_cleanup) + env = os.environ.copy() + env['KERBERIZED_HDFS_DIR'] = instance.path + '/' + subprocess.check_call(self.base_kerberized_hdfs_cmd + common_opts, env=env) + self.make_hdfs_api(kerberized=True) + self.wait_hdfs_to_start(timeout=300) if self.with_mongo and self.base_mongo_cmd: print('Setup Mongo') @@ -940,7 +931,7 @@ class ClickHouseInstance: if with_kerberized_kafka or with_kerberized_hdfs: self.keytab_path = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets:/tmp/keytab" - self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb_ch.conf:/etc/krb5.conf:ro" + self.krb5_conf = '- ' + os.path.dirname(self.docker_compose_path) + "/secrets/krb.conf:/etc/krb5.conf:ro" else: self.keytab_path = "" self.krb5_conf = "" diff --git a/tests/integration/helpers/hdfs_api.py b/tests/integration/helpers/hdfs_api.py index 79dc72003a5..cb742662855 100644 --- a/tests/integration/helpers/hdfs_api.py +++ b/tests/integration/helpers/hdfs_api.py @@ -4,6 +4,7 @@ import gzip import subprocess import time from tempfile import NamedTemporaryFile +import requests import requests_kerberos as reqkerb import socket import tempfile @@ -13,9 +14,9 @@ import os g_dns_hook = None def custom_getaddrinfo(*args): - print("from custom_getaddrinfo g_dns_hook is None ", g_dns_hook is None) + # print("from custom_getaddrinfo g_dns_hook is None ", g_dns_hook is None) ret = g_dns_hook.custom_getaddrinfo(*args) - print("g_dns_hook.custom_getaddrinfo result", ret) + # print("g_dns_hook.custom_getaddrinfo result", ret) return ret @@ -28,7 +29,7 @@ class mk_krb_conf(object): with open(self.krb_conf) as f: content = f.read() amended_content = content.replace('hdfskerberos', self.kdc_ip) - self.amended_krb_conf = tempfile.NamedTemporaryFile(delete=False) + self.amended_krb_conf = tempfile.NamedTemporaryFile(delete=False, mode="w+") self.amended_krb_conf.write(amended_content) self.amended_krb_conf.close() return self.amended_krb_conf.name @@ -36,38 +37,32 @@ class mk_krb_conf(object): if self.amended_krb_conf is not None: self.amended_krb_conf.close() - +# tweak dns resolution to connect to localhost where api_host is in URL class dns_hook(object): def __init__(self, hdfs_api): - print("dns_hook.init ", hdfs_api.kerberized, hdfs_api.host, hdfs_api.data_port, hdfs_api.proxy_port) + # print("dns_hook.init ", hdfs_api.kerberized, hdfs_api.host, hdfs_api.data_port, hdfs_api.proxy_port) self.hdfs_api = hdfs_api def __enter__(self): global g_dns_hook g_dns_hook = self - if True: # self.hdfs_api.kerberized: - print("g_dns_hook is None ", g_dns_hook is None) - self.original_getaddrinfo = socket.getaddrinfo - socket.getaddrinfo = custom_getaddrinfo - return self + # print("g_dns_hook is None ", g_dns_hook is None) + self.original_getaddrinfo = socket.getaddrinfo + socket.getaddrinfo = custom_getaddrinfo + return self def __exit__(self, type, value, traceback): global g_dns_hook g_dns_hook = None - if True: # self.hdfs_api.kerberized: - socket.getaddrinfo = self.original_getaddrinfo + socket.getaddrinfo = self.original_getaddrinfo def custom_getaddrinfo(self, *args): (hostname, port) = args[:2] - print("top of custom_getaddrinfo", hostname, port) + # print("top of custom_getaddrinfo", hostname, port) if hostname == self.hdfs_api.host and (port == self.hdfs_api.data_port or port == self.hdfs_api.proxy_port): - print("dns_hook substitute") - return [(socket.AF_INET, 1, 6, '', ("127.0.0.1", port))] #self.hdfs_api.hdfs_ip + # print("dns_hook substitute") + return [(socket.AF_INET, 1, 6, '', ("127.0.0.1", port))] else: return self.original_getaddrinfo(*args) - -import requests - - class HDFSApi(object): def __init__(self, user, timeout=100, kerberized=False, principal=None, keytab=None, krb_conf=None, @@ -86,14 +81,11 @@ class HDFSApi(object): self.kdc_ip = kdc_ip self.krb_conf = krb_conf - logging.basicConfig(level=logging.DEBUG) - logging.getLogger().setLevel(logging.DEBUG) - requests_log = logging.getLogger("requests.packages.urllib3") - requests_log.setLevel(logging.DEBUG) - requests_log.propagate = True - - - + # logging.basicConfig(level=logging.DEBUG) + # logging.getLogger().setLevel(logging.DEBUG) + # requests_log = logging.getLogger("requests.packages.urllib3") + # requests_log.setLevel(logging.DEBUG) + # requests_log.propagate = True if kerberized: self._run_kinit() @@ -109,23 +101,23 @@ class HDFSApi(object): raise Exception("kerberos principal and keytab are required") with mk_krb_conf(self.krb_conf, self.kdc_ip) as instantiated_krb_conf: - print("instantiated_krb_conf ", instantiated_krb_conf) + # print("instantiated_krb_conf ", instantiated_krb_conf) os.environ["KRB5_CONFIG"] = instantiated_krb_conf cmd = "(kinit -R -t {keytab} -k {principal} || (sleep 5 && kinit -R -t {keytab} -k {principal})) ; klist".format(instantiated_krb_conf=instantiated_krb_conf, keytab=self.keytab, principal=self.principal) - print(cmd) + # print(cmd) start = time.time() while time.time() - start < self.timeout: try: subprocess.call(cmd, shell=True) - print "KDC started, kinit successfully run" + print("KDC started, kinit successfully run") return except Exception as ex: - print "Can't run kinit ... waiting" + str(ex) + print("Can't run kinit ... waiting {}".format(str(ex))) time.sleep(1) raise Exception("Kinit running failure") @@ -137,7 +129,7 @@ class HDFSApi(object): response.raise_for_status() # additional_params = '&'.join(response.headers['Location'].split('&')[1:2]) url = "{location}".format(location=response.headers['Location']) - print("redirected to ", url) + # print("redirected to ", url) with dns_hook(self): response_data = requests.get(url, headers={'host': 'localhost'}, @@ -149,20 +141,6 @@ class HDFSApi(object): else: return response_data.content - # Requests can't put file - def _curl_to_put(self, filename, path, params): - url = "{protocol}://{host}:{port}/webhdfs/v1{path}?op=CREATE&{params}".format(protocol=self.protocol, - host=self.host, - port=self.data_port, - path=path, - params=params) - if self.kerberized: - cmd = "curl -k --negotiate -s -i -X PUT -T {fname} -u : '{url}' --resolve {host}:{port}:127.0.0.1".format(fname=filename, url=url) - else: - cmd = "curl -s -i -X PUT -T {fname} '{url}'".format(fname=filename, url=url) - output = subprocess.check_output(cmd, shell=True) - return output - def write_data(self, path, content): named_file = NamedTemporaryFile(mode='wb+') fpath = named_file.name @@ -173,12 +151,9 @@ class HDFSApi(object): if self.kerberized: - print("before request.put", os.environ["KRB5_CONFIG"]) self._run_kinit() - # cmd = "klist" - # subprocess.call(cmd, shell=True) self.kerberos_auth = reqkerb.HTTPKerberosAuth(mutual_authentication=reqkerb.DISABLED, hostname_override=self.host, principal=self.principal) - print(self.kerberos_auth) + # print(self.kerberos_auth) with dns_hook(self): response = requests.put( @@ -190,34 +165,26 @@ class HDFSApi(object): params={'overwrite' : 'true'}, verify=False, auth=self.kerberos_auth ) - print("after request.put", response.status_code) if response.status_code != 307: - print(response.headers) + # print(response.headers) response.raise_for_status() - print("after status code check") - additional_params = '&'.join( response.headers['Location'].split('&')[1:2] + ["user.name={}".format(self.user), "overwrite=true"]) - if False: #not self.kerberized: - output = self._curl_to_put(fpath, path, additional_params) - if "201 Created" not in output: - raise Exception("Can't create file on hdfs:\n {}".format(output)) - else: - with dns_hook(self), open(fpath) as fh: - file_data = fh.read() - protocol = "http" # self.protocol - response = requests.put( - "{location}".format(location=response.headers['Location']), - data=file_data, - headers={'content-type':'text/plain', 'host': 'localhost'}, - params={'file': path, 'user.name' : self.user}, - allow_redirects=False, verify=False, auth=self.kerberos_auth - ) - print(response) - if response.status_code != 201: - response.raise_for_status() + with dns_hook(self), open(fpath, mode="rb") as fh: + file_data = fh.read() + protocol = "http" # self.protocol + response = requests.put( + "{location}".format(location=response.headers['Location']), + data=file_data, + headers={'content-type':'text/plain', 'host': 'localhost'}, + params={'file': path, 'user.name' : self.user}, + allow_redirects=False, verify=False, auth=self.kerberos_auth + ) + # print(response) + if response.status_code != 201: + response.raise_for_status() def write_gzip_data(self, path, content): diff --git a/tests/integration/test_allowed_url_from_config/test.py b/tests/integration/test_allowed_url_from_config/test.py index 44715d92121..6442937c8f4 100644 --- a/tests/integration/test_allowed_url_from_config/test.py +++ b/tests/integration/test_allowed_url_from_config/test.py @@ -1,6 +1,5 @@ import pytest from helpers.cluster import ClickHouseCluster -from helpers.hdfs_api import HDFSApi cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance('node1', main_configs=['configs/config_with_hosts.xml']) @@ -101,9 +100,8 @@ def test_table_function_remote(start_cluster): def test_redirect(start_cluster): - hdfs_api = HDFSApi("root") - hdfs_api.write_data("/simple_storage", "1\t\n") - assert hdfs_api.read_data("/simple_storage") == "1\t\n" + start_cluster.hdfs_api.write_data("/simple_storage", "1\t\n") + assert start_cluster.hdfs_api.read_data("/simple_storage") == "1\t\n" node7.query( "CREATE TABLE table_test_7_1 (word String) ENGINE=URL('http://hdfs1:50070/webhdfs/v1/simple_storage?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0', CSV)") assert "not allowed" in node7.query_and_get_error("SET max_http_get_redirects=1; SELECT * from table_test_7_1") diff --git a/tests/integration/test_redirect_url_storage/test.py b/tests/integration/test_redirect_url_storage/test.py index f93548af0db..f2731794d43 100644 --- a/tests/integration/test_redirect_url_storage/test.py +++ b/tests/integration/test_redirect_url_storage/test.py @@ -17,9 +17,8 @@ def started_cluster(): def test_url_without_redirect(started_cluster): - hdfs_api = HDFSApi("root") - hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n") - assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n" + started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n") + assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n" # access datanode port directly node1.query( @@ -28,9 +27,8 @@ def test_url_without_redirect(started_cluster): def test_url_with_redirect_not_allowed(started_cluster): - hdfs_api = HDFSApi("root") - hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n") - assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n" + started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n") + assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n" # access proxy port without allowing redirects node1.query( @@ -40,9 +38,8 @@ def test_url_with_redirect_not_allowed(started_cluster): def test_url_with_redirect_allowed(started_cluster): - hdfs_api = HDFSApi("root") - hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n") - assert hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n" + started_cluster.hdfs_api.write_data("/simple_storage", "1\tMark\t72.53\n") + assert started_cluster.hdfs_api.read_data("/simple_storage") == "1\tMark\t72.53\n" # access proxy port with allowing redirects # http://localhost:50070/webhdfs/v1/b?op=OPEN&namenoderpcaddress=hdfs1:9000&offset=0 diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index eebc52a01a3..a6c8b7e1ee9 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -203,5 +203,5 @@ def test_write_gzip_storage(started_cluster): if __name__ == '__main__': cluster.start() - raw_input("Cluster created, press any key to destroy...") + input("Cluster created, press any key to destroy...") cluster.shutdown() diff --git a/tests/integration/test_storage_kerberized_hdfs/secrets/krb.conf b/tests/integration/test_storage_kerberized_hdfs/secrets/krb.conf index 98cc1e6810a..2c1e6f15f77 100644 --- a/tests/integration/test_storage_kerberized_hdfs/secrets/krb.conf +++ b/tests/integration/test_storage_kerberized_hdfs/secrets/krb.conf @@ -7,8 +7,7 @@ default_realm = TEST.CLICKHOUSE.TECH dns_lookup_realm = false dns_lookup_kdc = false - ticket_lifetime = 15d - # renew_lifetime = 15d + ticket_lifetime = 15s forwardable = true default_tgs_enctypes = des3-hmac-sha1 default_tkt_enctypes = des3-hmac-sha1 diff --git a/tests/integration/test_storage_kerberized_hdfs/secrets/krb_ch.conf b/tests/integration/test_storage_kerberized_hdfs/secrets/krb_long.conf similarity index 92% rename from tests/integration/test_storage_kerberized_hdfs/secrets/krb_ch.conf rename to tests/integration/test_storage_kerberized_hdfs/secrets/krb_long.conf index 850e9622109..ec1c54240e8 100644 --- a/tests/integration/test_storage_kerberized_hdfs/secrets/krb_ch.conf +++ b/tests/integration/test_storage_kerberized_hdfs/secrets/krb_long.conf @@ -7,8 +7,7 @@ default_realm = TEST.CLICKHOUSE.TECH dns_lookup_realm = false dns_lookup_kdc = false - ticket_lifetime = 15s - # renew_lifetime = 15d + ticket_lifetime = 15d forwardable = true default_tgs_enctypes = des3-hmac-sha1 default_tkt_enctypes = des3-hmac-sha1 diff --git a/tests/integration/test_storage_kerberized_hdfs/test.py b/tests/integration/test_storage_kerberized_hdfs/test.py index 9e9412a99ed..b5b330f7c78 100644 --- a/tests/integration/test_storage_kerberized_hdfs/test.py +++ b/tests/integration/test_storage_kerberized_hdfs/test.py @@ -6,7 +6,6 @@ import os from helpers.cluster import ClickHouseCluster import subprocess - cluster = ClickHouseCluster(__file__) node1 = cluster.add_instance('node1', with_kerberized_hdfs=True, user_configs=[], main_configs=['configs/log_conf.xml', 'configs/hdfs.xml']) @@ -24,59 +23,40 @@ def started_cluster(): cluster.shutdown() def test_read_table(started_cluster): - # hdfs_api = HDFSApi("root") data = "1\tSerialize\t555.222\n2\tData\t777.333\n" started_cluster.hdfs_api.write_data("/simple_table_function", data) api_read = started_cluster.hdfs_api.read_data("/simple_table_function") - print("api_read", api_read) - assert api_read == data select_read = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/simple_table_function', 'TSV', 'id UInt64, text String, number Float64')") - print("select_read", select_read) - assert select_read == data def test_read_write_storage(started_cluster): - # node1.query("create table SimpleHDFSStorage (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberized_hdfs1.test.clickhouse.tech:9000/simple_storage', 'TSV')") node1.query("create table SimpleHDFSStorage2 (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage1', 'TSV')") node1.query("insert into SimpleHDFSStorage2 values (1, 'Mark', 72.53)") api_read = started_cluster.hdfs_api.read_data("/simple_storage1") - print("api_read", api_read) assert api_read == "1\tMark\t72.53\n" select_read = node1.query("select * from SimpleHDFSStorage2") - print("select_read", select_read) assert select_read == "1\tMark\t72.53\n" -def test_write_storage_expired(started_cluster): - node1.query("create table SimpleHDFSStorageExpired (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage_expired', 'TSV')") +def test_write_storage_not_expired(started_cluster): + node1.query("create table SimpleHDFSStorageNotExpired (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/simple_storage_not_expired', 'TSV')") time.sleep(45) # wait for ticket expiration - node1.query("insert into SimpleHDFSStorageExpired values (1, 'Mark', 72.53)") + node1.query("insert into SimpleHDFSStorageNotExpired values (1, 'Mark', 72.53)") - api_read = started_cluster.hdfs_api.read_data("/simple_storage_expired") - print("api_read", api_read) + api_read = started_cluster.hdfs_api.read_data("/simple_storage_not_expired") assert api_read == "1\tMark\t72.53\n" - select_read = node1.query("select * from SimpleHDFSStorageExpired") - print("select_read", select_read) + select_read = node1.query("select * from SimpleHDFSStorageNotExpired") assert select_read == "1\tMark\t72.53\n" -def test_prohibited(started_cluster): - node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9000/storage_user_two_prohibited', 'TSV')") - try: - node1.query("insert into HDFSStorTwoProhibited values (1, 'SomeOne', 74.00)") - assert False, "Exception have to be thrown" - except Exception as ex: - assert "Unable to open HDFS file: /storage_user_two_prohibited error: Permission denied: user=specuser, access=WRITE" in str(ex) - - def test_two_users(started_cluster): node1.query("create table HDFSStorOne (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/storage_user_one', 'TSV')") node1.query("insert into HDFSStorOne values (1, 'IlyaReal', 86.00)") @@ -85,31 +65,10 @@ def test_two_users(started_cluster): node1.query("insert into HDFSStorTwo values (1, 'IlyaIdeal', 74.00)") select_read_1 = node1.query("select * from hdfs('hdfs://kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV', 'id UInt64, text String, number Float64')") - print("select_read_1", select_read_1) select_read_2 = node1.query("select * from hdfs('hdfs://suser@kerberizedhdfs1:9000/storage_user_one', 'TSV', 'id UInt64, text String, number Float64')") - print("select_read_2", select_read_2) - # node1.query("create table HDFSStorTwo_ (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://kerberizedhdfs1:9000/user/specuser/storage_user_two', 'TSV')") - # try: - # node1.query("insert into HDFSStorTwo_ values (1, 'AnotherPerspn', 88.54)") - # assert False, "Exception have to be thrown" - # except Exception as ex: - # print ex - # assert "DB::Exception: Unable to open HDFS file: /user/specuser/storage_user_two error: Permission denied: user=root, access=WRITE, inode=\"/user/specuser/storage_user_two\":specuser:supergroup:drwxr-xr-x" in str(ex) - - -def test_cache_path(started_cluster): - node1.query("create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9000/storage_dedicated_cache_path', 'TSV')") - try: - node1.query("insert into HDFSStorCachePath values (1, 'FatMark', 92.53)") - assert False, "Exception have to be thrown" - except Exception as ex: - assert "DB::Exception: hadoop.security.kerberos.ticket.cache.path cannot be set per user" in str(ex) - - - -def test_read_table_not_expired(started_cluster): +def test_read_table_expired(started_cluster): data = "1\tSerialize\t555.222\n2\tData\t777.333\n" started_cluster.hdfs_api.write_data("/simple_table_function_relogin", data) @@ -125,13 +84,25 @@ def test_read_table_not_expired(started_cluster): started_cluster.unpause_container('hdfskerberos') +def test_prohibited(started_cluster): + node1.query("create table HDFSStorTwoProhibited (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://suser@kerberizedhdfs1:9000/storage_user_two_prohibited', 'TSV')") + try: + node1.query("insert into HDFSStorTwoProhibited values (1, 'SomeOne', 74.00)") + assert False, "Exception have to be thrown" + except Exception as ex: + assert "Unable to open HDFS file: /storage_user_two_prohibited error: Permission denied: user=specuser, access=WRITE" in str(ex) -@pytest.mark.timeout(999999) -def _test_sleep_forever(started_cluster): - time.sleep(999999) + +def test_cache_path(started_cluster): + node1.query("create table HDFSStorCachePath (id UInt32, name String, weight Float64) ENGINE = HDFS('hdfs://dedicatedcachepath@kerberizedhdfs1:9000/storage_dedicated_cache_path', 'TSV')") + try: + node1.query("insert into HDFSStorCachePath values (1, 'FatMark', 92.53)") + assert False, "Exception have to be thrown" + except Exception as ex: + assert "DB::Exception: hadoop.security.kerberos.ticket.cache.path cannot be set per user" in str(ex) if __name__ == '__main__': cluster.start() - raw_input("Cluster created, press any key to destroy...") + input("Cluster created, press any key to destroy...") cluster.shutdown()