From cc5f49fbee5fe70ccb61f0f3750d3cbc9dea7455 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 19 Apr 2021 19:43:22 +0000 Subject: [PATCH 1/3] Fix --- src/Storages/HDFS/ReadBufferFromHDFS.cpp | 30 +++++++++++++++--------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/Storages/HDFS/ReadBufferFromHDFS.cpp b/src/Storages/HDFS/ReadBufferFromHDFS.cpp index affb76314b1..aaf3c2c964b 100644 --- a/src/Storages/HDFS/ReadBufferFromHDFS.cpp +++ b/src/Storages/HDFS/ReadBufferFromHDFS.cpp @@ -22,33 +22,37 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl static std::mutex hdfs_init_mutex; std::string hdfs_uri; + std::string hdfs_file_path; hdfsFile fin; HDFSBuilderWrapper builder; HDFSFSPtr fs; - ReadBufferFromHDFSImpl(const std::string & hdfs_name_, + explicit ReadBufferFromHDFSImpl( + const std::string & hdfs_uri_, + const std::string & hdfs_file_path_, const Poco::Util::AbstractConfiguration & config_) - : hdfs_uri(hdfs_name_), - builder(createHDFSBuilder(hdfs_uri, config_)) + : hdfs_uri(hdfs_uri_) + , hdfs_file_path(hdfs_file_path_) + , builder(createHDFSBuilder(hdfs_uri_, config_)) { std::lock_guard lock(hdfs_init_mutex); fs = createHDFSFS(builder.get()); - const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2); - const std::string path = hdfs_uri.substr(begin_of_path); - fin = hdfsOpenFile(fs.get(), path.c_str(), O_RDONLY, 0, 0, 0); + fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0); if (fin == nullptr) - throw Exception("Unable to open HDFS file: " + path + " error: " + std::string(hdfsGetLastError()), - ErrorCodes::CANNOT_OPEN_FILE); + throw Exception(ErrorCodes::CANNOT_OPEN_FILE, + "Unable to open HDFS file: {}. Error: {}", + hdfs_uri + hdfs_file_path, std::string(hdfsGetLastError())); } int read(char * start, size_t size) const { int bytes_read = hdfsRead(fs.get(), fin, start, size); if (bytes_read < 0) - throw Exception("Fail to read HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()), - ErrorCodes::NETWORK_ERROR); + throw Exception(ErrorCodes::NETWORK_ERROR, + "Fail to read from HDFS: {}, file path: {}. Error: {}", + hdfs_uri, hdfs_file_path, std::string(hdfsGetLastError())); return bytes_read; } @@ -66,8 +70,12 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration & config_, size_t buf_size_) : BufferWithOwnMemory(buf_size_) - , impl(std::make_unique(hdfs_name_, config_)) { + const size_t begin_of_path = hdfs_name_.find('/', hdfs_name_.find("//") + 2); + const String hdfs_file_path = hdfs_name_.substr(begin_of_path); + const String hdfs_uri = hdfs_name_.substr(0, begin_of_path) + "/"; + + impl = std::make_unique(hdfs_uri, hdfs_file_path, config_); } From bb767b66d5ab9207e03de7c9086227491fa45be2 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 19 Apr 2021 20:39:22 +0000 Subject: [PATCH 2/3] Add integration test --- tests/integration/test_storage_hdfs/test.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/integration/test_storage_hdfs/test.py b/tests/integration/test_storage_hdfs/test.py index a6c8b7e1ee9..90cc72240d5 100644 --- a/tests/integration/test_storage_hdfs/test.py +++ b/tests/integration/test_storage_hdfs/test.py @@ -201,6 +201,15 @@ def test_write_gzip_storage(started_cluster): assert started_cluster.hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n" assert node1.query("select * from GZIPHDFSStorage") == "1\tMark\t72.53\n" + +def test_read_files_with_spaces(started_cluster): + started_cluster.hdfs_api.write_data("/test test test 1.txt", "1\n") + started_cluster.hdfs_api.write_data("/test test test 2.txt", "2\n") + started_cluster.hdfs_api.write_data("/test test test 3.txt", "3\n") + node1.query("create table test (id UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/test*', 'TSV')") + assert node1.query("select * from test order by id") == "1\n2\n3\n" + + if __name__ == '__main__': cluster.start() input("Cluster created, press any key to destroy...") From da7a4ac1ea426cac874675db2c99b6bf85afea24 Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 20 Apr 2021 07:53:55 +0000 Subject: [PATCH 3/3] Better --- src/Storages/HDFS/HDFSCommon.cpp | 9 +-- src/Storages/HDFS/HDFSCommon.h | 80 ++++++++++-------------- src/Storages/HDFS/ReadBufferFromHDFS.cpp | 20 +++--- src/Storages/HDFS/ReadBufferFromHDFS.h | 15 +++-- src/Storages/HDFS/StorageHDFS.cpp | 15 ++++- src/Storages/HDFS/StorageHDFS.h | 2 +- 6 files changed, 72 insertions(+), 69 deletions(-) diff --git a/src/Storages/HDFS/HDFSCommon.cpp b/src/Storages/HDFS/HDFSCommon.cpp index e5ec8a06139..40f52921008 100644 --- a/src/Storages/HDFS/HDFSCommon.cpp +++ b/src/Storages/HDFS/HDFSCommon.cpp @@ -9,14 +9,15 @@ #include #include + namespace DB { namespace ErrorCodes { -extern const int BAD_ARGUMENTS; -extern const int NETWORK_ERROR; -extern const int EXCESSIVE_ELEMENT_IN_CONFIG; -extern const int NO_ELEMENTS_IN_CONFIG; + extern const int BAD_ARGUMENTS; + extern const int NETWORK_ERROR; + extern const int EXCESSIVE_ELEMENT_IN_CONFIG; + extern const int NO_ELEMENTS_IN_CONFIG; } const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs"; diff --git a/src/Storages/HDFS/HDFSCommon.h b/src/Storages/HDFS/HDFSCommon.h index fa1ca88464e..154c253a76b 100644 --- a/src/Storages/HDFS/HDFSCommon.h +++ b/src/Storages/HDFS/HDFSCommon.h @@ -17,6 +17,7 @@ namespace DB { + namespace detail { struct HDFSFsDeleter @@ -28,16 +29,14 @@ namespace detail }; } + struct HDFSFileInfo { hdfsFileInfo * file_info; int length; - HDFSFileInfo() - : file_info(nullptr) - , length(0) - { - } + HDFSFileInfo() : file_info(nullptr) , length(0) {} + HDFSFileInfo(const HDFSFileInfo & other) = delete; HDFSFileInfo(HDFSFileInfo && other) = default; HDFSFileInfo & operator=(const HDFSFileInfo & other) = delete; @@ -49,17 +48,30 @@ struct HDFSFileInfo } }; + class HDFSBuilderWrapper { - hdfsBuilder * hdfs_builder; - String hadoop_kerberos_keytab; - String hadoop_kerberos_principal; - String hadoop_kerberos_kinit_command = "kinit"; - String hadoop_security_kerberos_ticket_cache_path; - static std::mutex kinit_mtx; +friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &); - std::vector> config_stor; +static const String CONFIG_PREFIX; + +public: + HDFSBuilderWrapper() : hdfs_builder(hdfsNewBuilder()) {} + + ~HDFSBuilderWrapper() { hdfsFreeBuilder(hdfs_builder); } + + HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete; + HDFSBuilderWrapper(HDFSBuilderWrapper &&) = default; + + hdfsBuilder * get() { return hdfs_builder; } + +private: + void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser = false); + + String getKinitCmd(); + + void runKinit(); // hdfs builder relies on an external config data storage std::pair& keep(const String & k, const String & v) @@ -67,48 +79,24 @@ class HDFSBuilderWrapper return config_stor.emplace_back(std::make_pair(k, v)); } + hdfsBuilder * hdfs_builder; + String hadoop_kerberos_keytab; + String hadoop_kerberos_principal; + String hadoop_kerberos_kinit_command = "kinit"; + String hadoop_security_kerberos_ticket_cache_path; + + static std::mutex kinit_mtx; + std::vector> config_stor; bool need_kinit{false}; - - static const String CONFIG_PREFIX; - -private: - - void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser = false); - - String getKinitCmd(); - - void runKinit(); - -public: - - hdfsBuilder * - get() - { - return hdfs_builder; - } - - HDFSBuilderWrapper() - : hdfs_builder(hdfsNewBuilder()) - { - } - - ~HDFSBuilderWrapper() - { - hdfsFreeBuilder(hdfs_builder); - - } - - HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete; - HDFSBuilderWrapper(HDFSBuilderWrapper &&) = default; - - friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &); }; using HDFSFSPtr = std::unique_ptr, detail::HDFSFsDeleter>; + // set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large /// TODO Allow to tune from query Settings. HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &); HDFSFSPtr createHDFSFS(hdfsBuilder * builder); + } #endif diff --git a/src/Storages/HDFS/ReadBufferFromHDFS.cpp b/src/Storages/HDFS/ReadBufferFromHDFS.cpp index aaf3c2c964b..29ea46c7590 100644 --- a/src/Storages/HDFS/ReadBufferFromHDFS.cpp +++ b/src/Storages/HDFS/ReadBufferFromHDFS.cpp @@ -8,6 +8,7 @@ namespace DB { + namespace ErrorCodes { extern const int NETWORK_ERROR; @@ -21,8 +22,9 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl /// HDFS create/open functions are not thread safe static std::mutex hdfs_init_mutex; - std::string hdfs_uri; - std::string hdfs_file_path; + String hdfs_uri; + String hdfs_file_path; + hdfsFile fin; HDFSBuilderWrapper builder; HDFSFSPtr fs; @@ -66,16 +68,14 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl std::mutex ReadBufferFromHDFS::ReadBufferFromHDFSImpl::hdfs_init_mutex; -ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, - const Poco::Util::AbstractConfiguration & config_, - size_t buf_size_) +ReadBufferFromHDFS::ReadBufferFromHDFS( + const String & hdfs_uri_, + const String & hdfs_file_path_, + const Poco::Util::AbstractConfiguration & config_, + size_t buf_size_) : BufferWithOwnMemory(buf_size_) + , impl(std::make_unique(hdfs_uri_, hdfs_file_path_, config_)) { - const size_t begin_of_path = hdfs_name_.find('/', hdfs_name_.find("//") + 2); - const String hdfs_file_path = hdfs_name_.substr(begin_of_path); - const String hdfs_uri = hdfs_name_.substr(0, begin_of_path) + "/"; - - impl = std::make_unique(hdfs_uri, hdfs_file_path, config_); } diff --git a/src/Storages/HDFS/ReadBufferFromHDFS.h b/src/Storages/HDFS/ReadBufferFromHDFS.h index 8d26c001b2e..bd14e3d3792 100644 --- a/src/Storages/HDFS/ReadBufferFromHDFS.h +++ b/src/Storages/HDFS/ReadBufferFromHDFS.h @@ -7,11 +7,8 @@ #include #include #include - #include - #include - #include @@ -22,13 +19,19 @@ namespace DB */ class ReadBufferFromHDFS : public BufferWithOwnMemory { - struct ReadBufferFromHDFSImpl; - std::unique_ptr impl; +struct ReadBufferFromHDFSImpl; + public: - ReadBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE); + ReadBufferFromHDFS(const String & hdfs_uri_, const String & hdfs_file_path_, + const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE); + ~ReadBufferFromHDFS() override; bool nextImpl() override; + +private: + std::unique_ptr impl; }; } + #endif diff --git a/src/Storages/HDFS/StorageHDFS.cpp b/src/Storages/HDFS/StorageHDFS.cpp index 392495ff77f..ad2a63c44b1 100644 --- a/src/Storages/HDFS/StorageHDFS.cpp +++ b/src/Storages/HDFS/StorageHDFS.cpp @@ -122,7 +122,7 @@ public: current_path = uri + path; auto compression = chooseCompressionMethod(path, compression_method); - auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(current_path, getContext()->getGlobalContext()->getConfigRef()), compression); + auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(uri, path, getContext()->getGlobalContext()->getConfigRef()), compression); auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, getContext(), max_block_size); auto input_stream = std::make_shared(input_format); @@ -271,7 +271,15 @@ Pipe StorageHDFS::read( size_t max_block_size, unsigned num_streams) { - const size_t begin_of_path = uri.find('/', uri.find("//") + 2); + size_t begin_of_path; + /// This uri is checked for correctness in constructor of StorageHDFS and never modified afterwards + auto two_slash = uri.find("//"); + + if (two_slash == std::string::npos) + begin_of_path = uri.find('/'); + else + begin_of_path = uri.find('/', two_slash + 2); + const String path_from_uri = uri.substr(begin_of_path); const String uri_without_path = uri.substr(0, begin_of_path); @@ -281,6 +289,9 @@ Pipe StorageHDFS::read( auto sources_info = std::make_shared(); sources_info->uris = LSWithRegexpMatching("/", fs, path_from_uri); + if (sources_info->uris.empty()) + LOG_WARNING(log, "No file in HDFS matches the path: {}", uri); + for (const auto & column : column_names) { if (column == "_path") diff --git a/src/Storages/HDFS/StorageHDFS.h b/src/Storages/HDFS/StorageHDFS.h index 0dd57685354..e3f235296ac 100644 --- a/src/Storages/HDFS/StorageHDFS.h +++ b/src/Storages/HDFS/StorageHDFS.h @@ -42,7 +42,7 @@ protected: const String & compression_method_); private: - String uri; + const String uri; String format_name; String compression_method;