diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index b589c398238..4a0652e86f2 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -390,8 +390,8 @@ if (USE_PROTOBUF) endif () if (USE_HDFS) - target_link_libraries (clickhouse_common_io PRIVATE ${HDFS3_LIBRARY}) - target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${HDFS3_INCLUDE_DIR}) + target_link_libraries (clickhouse_common_io PUBLIC ${HDFS3_LIBRARY}) + target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR}) endif() if (USE_BROTLI) diff --git a/dbms/src/IO/HDFSCommon.h b/dbms/src/IO/HDFSCommon.h index 8c526d908bb..0be00de08dd 100644 --- a/dbms/src/IO/HDFSCommon.h +++ b/dbms/src/IO/HDFSCommon.h @@ -27,6 +27,26 @@ struct HDFSFsDeleter } +struct HDFSFileInfo +{ + hdfsFileInfo * file_info; + int length; + + HDFSFileInfo() + : file_info(nullptr) + , length(0) + { + } + HDFSFileInfo(const HDFSFileInfo & other) = delete; + HDFSFileInfo(HDFSFileInfo && other) = default; + HDFSFileInfo & operator=(const HDFSFileInfo & other) = delete; + HDFSFileInfo & operator=(HDFSFileInfo && other) = default; + + ~HDFSFileInfo() + { + hdfsFreeFileInfo(file_info, length); + } +}; using HDFSBuilderPtr = std::unique_ptr; using HDFSFSPtr = std::unique_ptr, detail::HDFSFsDeleter>; diff --git a/dbms/src/Storages/StorageHDFS.cpp b/dbms/src/Storages/StorageHDFS.cpp index 8c87f4ccd6a..7d9d123e5a3 100644 --- a/dbms/src/Storages/StorageHDFS.cpp +++ b/dbms/src/Storages/StorageHDFS.cpp @@ -9,12 +9,17 @@ #include #include #include +#include #include #include #include #include #include - +#include +#include +#include +#include +#include namespace DB { @@ -127,6 +132,13 @@ private: BlockOutputStreamPtr writer; }; +//static Strings recursiveLSWithRegexpMatching(const String & cur_path, hdfsFS fs, const re2::RE2 & matcher) +//{ +// HDFSFileInfo ls; +// ls.file_info = hdfsListDirectory(fs.get(), path_without_globs, ls.length); +// +//} + } @@ -138,12 +150,51 @@ BlockInputStreams StorageHDFS::read( size_t max_block_size, unsigned /*num_streams*/) { - return {std::make_shared( - uri, - format_name, - getSampleBlock(), - context_, - max_block_size)}; + Strings path_parts; + Poco::URI poco_uri(uri); + poco_uri.getPathSegments(path_parts); + String path_without_globs; + for (const auto & part : path_parts) + { + if ((part.find('*') != std::string::npos) || (part.find('?') != std::string::npos) || (part.find('{') != std::string::npos)) + break; + path_without_globs.push_back('/'); + path_without_globs.append(part); + } + if (path_without_globs == poco_uri.getPath()) + return {std::make_shared( + uri, + format_name, + getSampleBlock(), + context_, + max_block_size)}; + + String path_pattern = makeRegexpPatternFromGlobs(poco_uri.getPath()); + re2::RE2 matcher(path_pattern); + path_without_globs.push_back('/'); + poco_uri.setPath(path_without_globs); + HDFSBuilderPtr builder = createHDFSBuilder(poco_uri); + HDFSFSPtr fs = createHDFSFS(builder.get()); +// Strings res_paths = recursiveLSWithRegexpMatching(path_without_globs, fs.get(), matcher); + HDFSFileInfo ls; + ls.file_info = hdfsListDirectory(fs.get(), path_without_globs.data(), &ls.length); + BlockInputStreams result; + for (int i = 0; i < ls.length; ++i) + { + if (ls.file_info[i].mKind == 'F') + { + String cur_path = path_without_globs + String(ls.file_info[i].mName); + if (re2::RE2::FullMatch(cur_path, matcher)) + { + poco_uri.setPath(cur_path); + result.push_back( + std::make_shared(poco_uri.toString(), format_name, getSampleBlock(), context_, + max_block_size)); + } + } + } + + return result; } void StorageHDFS::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name)