diff --git a/src/Storages/HDFS/HDFSCommon.cpp b/src/Storages/HDFS/HDFSCommon.cpp index e5ec8a06139..40f52921008 100644 --- a/src/Storages/HDFS/HDFSCommon.cpp +++ b/src/Storages/HDFS/HDFSCommon.cpp @@ -9,14 +9,15 @@ #include #include + namespace DB { namespace ErrorCodes { -extern const int BAD_ARGUMENTS; -extern const int NETWORK_ERROR; -extern const int EXCESSIVE_ELEMENT_IN_CONFIG; -extern const int NO_ELEMENTS_IN_CONFIG; + extern const int BAD_ARGUMENTS; + extern const int NETWORK_ERROR; + extern const int EXCESSIVE_ELEMENT_IN_CONFIG; + extern const int NO_ELEMENTS_IN_CONFIG; } const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs"; diff --git a/src/Storages/HDFS/HDFSCommon.h b/src/Storages/HDFS/HDFSCommon.h index fa1ca88464e..154c253a76b 100644 --- a/src/Storages/HDFS/HDFSCommon.h +++ b/src/Storages/HDFS/HDFSCommon.h @@ -17,6 +17,7 @@ namespace DB { + namespace detail { struct HDFSFsDeleter @@ -28,16 +29,14 @@ namespace detail }; } + struct HDFSFileInfo { hdfsFileInfo * file_info; int length; - HDFSFileInfo() - : file_info(nullptr) - , length(0) - { - } + HDFSFileInfo() : file_info(nullptr) , length(0) {} + HDFSFileInfo(const HDFSFileInfo & other) = delete; HDFSFileInfo(HDFSFileInfo && other) = default; HDFSFileInfo & operator=(const HDFSFileInfo & other) = delete; @@ -49,17 +48,30 @@ struct HDFSFileInfo } }; + class HDFSBuilderWrapper { - hdfsBuilder * hdfs_builder; - String hadoop_kerberos_keytab; - String hadoop_kerberos_principal; - String hadoop_kerberos_kinit_command = "kinit"; - String hadoop_security_kerberos_ticket_cache_path; - static std::mutex kinit_mtx; +friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &); - std::vector> config_stor; +static const String CONFIG_PREFIX; + +public: + HDFSBuilderWrapper() : hdfs_builder(hdfsNewBuilder()) {} + + ~HDFSBuilderWrapper() { hdfsFreeBuilder(hdfs_builder); } + + HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete; + HDFSBuilderWrapper(HDFSBuilderWrapper &&) = default; + + hdfsBuilder * get() { return hdfs_builder; } + +private: + void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser = false); + + String getKinitCmd(); + + void runKinit(); // hdfs builder relies on an external config data storage std::pair& keep(const String & k, const String & v) @@ -67,48 +79,24 @@ class HDFSBuilderWrapper return config_stor.emplace_back(std::make_pair(k, v)); } + hdfsBuilder * hdfs_builder; + String hadoop_kerberos_keytab; + String hadoop_kerberos_principal; + String hadoop_kerberos_kinit_command = "kinit"; + String hadoop_security_kerberos_ticket_cache_path; + + static std::mutex kinit_mtx; + std::vector> config_stor; bool need_kinit{false}; - - static const String CONFIG_PREFIX; - -private: - - void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser = false); - - String getKinitCmd(); - - void runKinit(); - -public: - - hdfsBuilder * - get() - { - return hdfs_builder; - } - - HDFSBuilderWrapper() - : hdfs_builder(hdfsNewBuilder()) - { - } - - ~HDFSBuilderWrapper() - { - hdfsFreeBuilder(hdfs_builder); - - } - - HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete; - HDFSBuilderWrapper(HDFSBuilderWrapper &&) = default; - - friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &); }; using HDFSFSPtr = std::unique_ptr, detail::HDFSFsDeleter>; + // set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large /// TODO Allow to tune from query Settings. HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &); HDFSFSPtr createHDFSFS(hdfsBuilder * builder); + } #endif diff --git a/src/Storages/HDFS/ReadBufferFromHDFS.cpp b/src/Storages/HDFS/ReadBufferFromHDFS.cpp index aaf3c2c964b..29ea46c7590 100644 --- a/src/Storages/HDFS/ReadBufferFromHDFS.cpp +++ b/src/Storages/HDFS/ReadBufferFromHDFS.cpp @@ -8,6 +8,7 @@ namespace DB { + namespace ErrorCodes { extern const int NETWORK_ERROR; @@ -21,8 +22,9 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl /// HDFS create/open functions are not thread safe static std::mutex hdfs_init_mutex; - std::string hdfs_uri; - std::string hdfs_file_path; + String hdfs_uri; + String hdfs_file_path; + hdfsFile fin; HDFSBuilderWrapper builder; HDFSFSPtr fs; @@ -66,16 +68,14 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl std::mutex ReadBufferFromHDFS::ReadBufferFromHDFSImpl::hdfs_init_mutex; -ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, - const Poco::Util::AbstractConfiguration & config_, - size_t buf_size_) +ReadBufferFromHDFS::ReadBufferFromHDFS( + const String & hdfs_uri_, + const String & hdfs_file_path_, + const Poco::Util::AbstractConfiguration & config_, + size_t buf_size_) : BufferWithOwnMemory(buf_size_) + , impl(std::make_unique(hdfs_uri_, hdfs_file_path_, config_)) { - const size_t begin_of_path = hdfs_name_.find('/', hdfs_name_.find("//") + 2); - const String hdfs_file_path = hdfs_name_.substr(begin_of_path); - const String hdfs_uri = hdfs_name_.substr(0, begin_of_path) + "/"; - - impl = std::make_unique(hdfs_uri, hdfs_file_path, config_); } diff --git a/src/Storages/HDFS/ReadBufferFromHDFS.h b/src/Storages/HDFS/ReadBufferFromHDFS.h index 8d26c001b2e..bd14e3d3792 100644 --- a/src/Storages/HDFS/ReadBufferFromHDFS.h +++ b/src/Storages/HDFS/ReadBufferFromHDFS.h @@ -7,11 +7,8 @@ #include #include #include - #include - #include - #include @@ -22,13 +19,19 @@ namespace DB */ class ReadBufferFromHDFS : public BufferWithOwnMemory { - struct ReadBufferFromHDFSImpl; - std::unique_ptr impl; +struct ReadBufferFromHDFSImpl; + public: - ReadBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE); + ReadBufferFromHDFS(const String & hdfs_uri_, const String & hdfs_file_path_, + const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE); + ~ReadBufferFromHDFS() override; bool nextImpl() override; + +private: + std::unique_ptr impl; }; } + #endif diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index 392495ff77f..ad2a63c44b1 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -122,7 +122,7 @@ public: current_path = uri + path; auto compression = chooseCompressionMethod(path, compression_method); - auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(current_path, getContext()->getGlobalContext()->getConfigRef()), compression); + auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(uri, path, getContext()->getGlobalContext()->getConfigRef()), compression); auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, getContext(), max_block_size); auto input_stream = std::make_shared(input_format); @@ -271,7 +271,15 @@ Pipe StorageHDFS::read( size_t max_block_size, unsigned num_streams) { - const size_t begin_of_path = uri.find('/', uri.find("//") + 2); + size_t begin_of_path; + /// This uri is checked for correctness in constructor of StorageHDFS and never modified afterwards + auto two_slash = uri.find("//"); + + if (two_slash == std::string::npos) + begin_of_path = uri.find('/'); + else + begin_of_path = uri.find('/', two_slash + 2); + const String path_from_uri = uri.substr(begin_of_path); const String uri_without_path = uri.substr(0, begin_of_path); @@ -281,6 +289,9 @@ Pipe StorageHDFS::read( auto sources_info = std::make_shared(); sources_info->uris = LSWithRegexpMatching("/", fs, path_from_uri); + if (sources_info->uris.empty()) + LOG_WARNING(log, "No file in HDFS matches the path: {}", uri); + for (const auto & column : column_names) { if (column == "_path") diff --git a/src/Storages/HDFS/StorageHDFS.h b/src/Storages/HDFS/StorageHDFS.h index 0dd57685354..e3f235296ac 100644 --- a/src/Storages/HDFS/StorageHDFS.h +++ b/src/Storages/HDFS/StorageHDFS.h @@ -42,7 +42,7 @@ protected: const String & compression_method_); private: - String uri; + const String uri; String format_name; String compression_method;