This commit is contained in:
stavrolia 2019-07-24 12:51:02 +03:00
parent dbf1417bd6
commit 6055c61e7f
3 changed files with 80 additions and 9 deletions

View File

@ -390,8 +390,8 @@ if (USE_PROTOBUF)
endif ()
if (USE_HDFS)
target_link_libraries (clickhouse_common_io PRIVATE ${HDFS3_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${HDFS3_INCLUDE_DIR})
target_link_libraries (clickhouse_common_io PUBLIC ${HDFS3_LIBRARY})
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${HDFS3_INCLUDE_DIR})
endif()
if (USE_BROTLI)

View File

@ -27,6 +27,26 @@ struct HDFSFsDeleter
}
struct HDFSFileInfo
{
hdfsFileInfo * file_info;
int length;
HDFSFileInfo()
: file_info(nullptr)
, length(0)
{
}
HDFSFileInfo(const HDFSFileInfo & other) = delete;
HDFSFileInfo(HDFSFileInfo && other) = default;
HDFSFileInfo & operator=(const HDFSFileInfo & other) = delete;
HDFSFileInfo & operator=(HDFSFileInfo && other) = default;
~HDFSFileInfo()
{
hdfsFreeFileInfo(file_info, length);
}
};
using HDFSBuilderPtr = std::unique_ptr<hdfsBuilder, detail::HDFSBuilderDeleter>;
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;

View File

@ -9,12 +9,17 @@
#include <Parsers/ASTLiteral.h>
#include <IO/ReadBufferFromHDFS.h>
#include <IO/WriteBufferFromHDFS.h>
#include <IO/HDFSCommon.h>
#include <Formats/FormatFactory.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <Common/parseGlobs.h>
#include <Poco/URI.h>
#include <re2/re2.h>
#include <re2/stringpiece.h>
#include <hdfs/hdfs.h>
namespace DB
{
@ -127,6 +132,13 @@ private:
BlockOutputStreamPtr writer;
};
//static Strings recursiveLSWithRegexpMatching(const String & cur_path, hdfsFS fs, const re2::RE2 & matcher)
//{
// HDFSFileInfo ls;
// ls.file_info = hdfsListDirectory(fs.get(), path_without_globs, ls.length);
//
//}
}
@ -138,12 +150,51 @@ BlockInputStreams StorageHDFS::read(
size_t max_block_size,
unsigned /*num_streams*/)
{
return {std::make_shared<HDFSBlockInputStream>(
uri,
format_name,
getSampleBlock(),
context_,
max_block_size)};
Strings path_parts;
Poco::URI poco_uri(uri);
poco_uri.getPathSegments(path_parts);
String path_without_globs;
for (const auto & part : path_parts)
{
if ((part.find('*') != std::string::npos) || (part.find('?') != std::string::npos) || (part.find('{') != std::string::npos))
break;
path_without_globs.push_back('/');
path_without_globs.append(part);
}
if (path_without_globs == poco_uri.getPath())
return {std::make_shared<HDFSBlockInputStream>(
uri,
format_name,
getSampleBlock(),
context_,
max_block_size)};
String path_pattern = makeRegexpPatternFromGlobs(poco_uri.getPath());
re2::RE2 matcher(path_pattern);
path_without_globs.push_back('/');
poco_uri.setPath(path_without_globs);
HDFSBuilderPtr builder = createHDFSBuilder(poco_uri);
HDFSFSPtr fs = createHDFSFS(builder.get());
// Strings res_paths = recursiveLSWithRegexpMatching(path_without_globs, fs.get(), matcher);
HDFSFileInfo ls;
ls.file_info = hdfsListDirectory(fs.get(), path_without_globs.data(), &ls.length);
BlockInputStreams result;
for (int i = 0; i < ls.length; ++i)
{
if (ls.file_info[i].mKind == 'F')
{
String cur_path = path_without_globs + String(ls.file_info[i].mName);
if (re2::RE2::FullMatch(cur_path, matcher))
{
poco_uri.setPath(cur_path);
result.push_back(
std::make_shared<HDFSBlockInputStream>(poco_uri.toString(), format_name, getSampleBlock(), context_,
max_block_size));
}
}
}
return result;
}
void StorageHDFS::rename(const String & /*new_path_to_db*/, const String & new_database_name, const String & new_table_name)