Merge pull request #23318 from kssenii/fix-hdfs-with-spaces

Fix hdfs reading from files with spaces
This commit is contained in:
alexey-milovidov 2021-04-20 21:48:41 +03:00 committed by GitHub
commit 4c1022ac03
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 94 additions and 74 deletions

View File

@ -9,14 +9,15 @@
#include <IO/Operators.h> #include <IO/Operators.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR; extern const int NETWORK_ERROR;
extern const int EXCESSIVE_ELEMENT_IN_CONFIG; extern const int EXCESSIVE_ELEMENT_IN_CONFIG;
extern const int NO_ELEMENTS_IN_CONFIG; extern const int NO_ELEMENTS_IN_CONFIG;
} }
const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs"; const String HDFSBuilderWrapper::CONFIG_PREFIX = "hdfs";

View File

@ -17,6 +17,7 @@
namespace DB namespace DB
{ {
namespace detail namespace detail
{ {
struct HDFSFsDeleter struct HDFSFsDeleter
@ -28,16 +29,14 @@ namespace detail
}; };
} }
struct HDFSFileInfo struct HDFSFileInfo
{ {
hdfsFileInfo * file_info; hdfsFileInfo * file_info;
int length; int length;
HDFSFileInfo() HDFSFileInfo() : file_info(nullptr) , length(0) {}
: file_info(nullptr)
, length(0)
{
}
HDFSFileInfo(const HDFSFileInfo & other) = delete; HDFSFileInfo(const HDFSFileInfo & other) = delete;
HDFSFileInfo(HDFSFileInfo && other) = default; HDFSFileInfo(HDFSFileInfo && other) = default;
HDFSFileInfo & operator=(const HDFSFileInfo & other) = delete; HDFSFileInfo & operator=(const HDFSFileInfo & other) = delete;
@ -49,17 +48,30 @@ struct HDFSFileInfo
} }
}; };
class HDFSBuilderWrapper class HDFSBuilderWrapper
{ {
hdfsBuilder * hdfs_builder;
String hadoop_kerberos_keytab;
String hadoop_kerberos_principal;
String hadoop_kerberos_kinit_command = "kinit";
String hadoop_security_kerberos_ticket_cache_path;
static std::mutex kinit_mtx; friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
std::vector<std::pair<String, String>> config_stor; static const String CONFIG_PREFIX;
public:
HDFSBuilderWrapper() : hdfs_builder(hdfsNewBuilder()) {}
~HDFSBuilderWrapper() { hdfsFreeBuilder(hdfs_builder); }
HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete;
HDFSBuilderWrapper(HDFSBuilderWrapper &&) = default;
hdfsBuilder * get() { return hdfs_builder; }
private:
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser = false);
String getKinitCmd();
void runKinit();
// hdfs builder relies on an external config data storage // hdfs builder relies on an external config data storage
std::pair<String, String>& keep(const String & k, const String & v) std::pair<String, String>& keep(const String & k, const String & v)
@ -67,48 +79,24 @@ class HDFSBuilderWrapper
return config_stor.emplace_back(std::make_pair(k, v)); return config_stor.emplace_back(std::make_pair(k, v));
} }
hdfsBuilder * hdfs_builder;
String hadoop_kerberos_keytab;
String hadoop_kerberos_principal;
String hadoop_kerberos_kinit_command = "kinit";
String hadoop_security_kerberos_ticket_cache_path;
static std::mutex kinit_mtx;
std::vector<std::pair<String, String>> config_stor;
bool need_kinit{false}; bool need_kinit{false};
static const String CONFIG_PREFIX;
private:
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const String & config_path, bool isUser = false);
String getKinitCmd();
void runKinit();
public:
hdfsBuilder *
get()
{
return hdfs_builder;
}
HDFSBuilderWrapper()
: hdfs_builder(hdfsNewBuilder())
{
}
~HDFSBuilderWrapper()
{
hdfsFreeBuilder(hdfs_builder);
}
HDFSBuilderWrapper(const HDFSBuilderWrapper &) = delete;
HDFSBuilderWrapper(HDFSBuilderWrapper &&) = default;
friend HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
}; };
using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>; using HDFSFSPtr = std::unique_ptr<std::remove_pointer_t<hdfsFS>, detail::HDFSFsDeleter>;
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large // set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
/// TODO Allow to tune from query Settings. /// TODO Allow to tune from query Settings.
HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &); HDFSBuilderWrapper createHDFSBuilder(const String & uri_str, const Poco::Util::AbstractConfiguration &);
HDFSFSPtr createHDFSFS(hdfsBuilder * builder); HDFSFSPtr createHDFSFS(hdfsBuilder * builder);
} }
#endif #endif

View File

@ -8,6 +8,7 @@
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
{ {
extern const int NETWORK_ERROR; extern const int NETWORK_ERROR;
@ -21,34 +22,39 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
/// HDFS create/open functions are not thread safe /// HDFS create/open functions are not thread safe
static std::mutex hdfs_init_mutex; static std::mutex hdfs_init_mutex;
std::string hdfs_uri; String hdfs_uri;
String hdfs_file_path;
hdfsFile fin; hdfsFile fin;
HDFSBuilderWrapper builder; HDFSBuilderWrapper builder;
HDFSFSPtr fs; HDFSFSPtr fs;
ReadBufferFromHDFSImpl(const std::string & hdfs_name_, explicit ReadBufferFromHDFSImpl(
const std::string & hdfs_uri_,
const std::string & hdfs_file_path_,
const Poco::Util::AbstractConfiguration & config_) const Poco::Util::AbstractConfiguration & config_)
: hdfs_uri(hdfs_name_), : hdfs_uri(hdfs_uri_)
builder(createHDFSBuilder(hdfs_uri, config_)) , hdfs_file_path(hdfs_file_path_)
, builder(createHDFSBuilder(hdfs_uri_, config_))
{ {
std::lock_guard lock(hdfs_init_mutex); std::lock_guard lock(hdfs_init_mutex);
fs = createHDFSFS(builder.get()); fs = createHDFSFS(builder.get());
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2); fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0);
const std::string path = hdfs_uri.substr(begin_of_path);
fin = hdfsOpenFile(fs.get(), path.c_str(), O_RDONLY, 0, 0, 0);
if (fin == nullptr) if (fin == nullptr)
throw Exception("Unable to open HDFS file: " + path + " error: " + std::string(hdfsGetLastError()), throw Exception(ErrorCodes::CANNOT_OPEN_FILE,
ErrorCodes::CANNOT_OPEN_FILE); "Unable to open HDFS file: {}. Error: {}",
hdfs_uri + hdfs_file_path, std::string(hdfsGetLastError()));
} }
int read(char * start, size_t size) const int read(char * start, size_t size) const
{ {
int bytes_read = hdfsRead(fs.get(), fin, start, size); int bytes_read = hdfsRead(fs.get(), fin, start, size);
if (bytes_read < 0) if (bytes_read < 0)
throw Exception("Fail to read HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()), throw Exception(ErrorCodes::NETWORK_ERROR,
ErrorCodes::NETWORK_ERROR); "Fail to read from HDFS: {}, file path: {}. Error: {}",
hdfs_uri, hdfs_file_path, std::string(hdfsGetLastError()));
return bytes_read; return bytes_read;
} }
@ -62,11 +68,13 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
std::mutex ReadBufferFromHDFS::ReadBufferFromHDFSImpl::hdfs_init_mutex; std::mutex ReadBufferFromHDFS::ReadBufferFromHDFSImpl::hdfs_init_mutex;
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, ReadBufferFromHDFS::ReadBufferFromHDFS(
const Poco::Util::AbstractConfiguration & config_, const String & hdfs_uri_,
size_t buf_size_) const String & hdfs_file_path_,
const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_)
: BufferWithOwnMemory<ReadBuffer>(buf_size_) : BufferWithOwnMemory<ReadBuffer>(buf_size_)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_, config_)) , impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_uri_, hdfs_file_path_, config_))
{ {
} }

View File

@ -7,11 +7,8 @@
#include <IO/BufferWithOwnMemory.h> #include <IO/BufferWithOwnMemory.h>
#include <string> #include <string>
#include <memory> #include <memory>
#include <hdfs/hdfs.h> #include <hdfs/hdfs.h>
#include <common/types.h> #include <common/types.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
@ -22,13 +19,19 @@ namespace DB
*/ */
class ReadBufferFromHDFS : public BufferWithOwnMemory<ReadBuffer> class ReadBufferFromHDFS : public BufferWithOwnMemory<ReadBuffer>
{ {
struct ReadBufferFromHDFSImpl; struct ReadBufferFromHDFSImpl;
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
public: public:
ReadBufferFromHDFS(const std::string & hdfs_name_, const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE); ReadBufferFromHDFS(const String & hdfs_uri_, const String & hdfs_file_path_,
const Poco::Util::AbstractConfiguration &, size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE);
~ReadBufferFromHDFS() override; ~ReadBufferFromHDFS() override;
bool nextImpl() override; bool nextImpl() override;
private:
std::unique_ptr<ReadBufferFromHDFSImpl> impl;
}; };
} }
#endif #endif

View File

@ -122,7 +122,7 @@ public:
current_path = uri + path; current_path = uri + path;
auto compression = chooseCompressionMethod(path, compression_method); auto compression = chooseCompressionMethod(path, compression_method);
auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(current_path, getContext()->getGlobalContext()->getConfigRef()), compression); auto read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromHDFS>(uri, path, getContext()->getGlobalContext()->getConfigRef()), compression);
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, getContext(), max_block_size); auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, getContext(), max_block_size);
auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format); auto input_stream = std::make_shared<InputStreamFromInputFormat>(input_format);
@ -271,7 +271,15 @@ Pipe StorageHDFS::read(
size_t max_block_size, size_t max_block_size,
unsigned num_streams) unsigned num_streams)
{ {
const size_t begin_of_path = uri.find('/', uri.find("//") + 2); size_t begin_of_path;
/// This uri is checked for correctness in constructor of StorageHDFS and never modified afterwards
auto two_slash = uri.find("//");
if (two_slash == std::string::npos)
begin_of_path = uri.find('/');
else
begin_of_path = uri.find('/', two_slash + 2);
const String path_from_uri = uri.substr(begin_of_path); const String path_from_uri = uri.substr(begin_of_path);
const String uri_without_path = uri.substr(0, begin_of_path); const String uri_without_path = uri.substr(0, begin_of_path);
@ -281,6 +289,9 @@ Pipe StorageHDFS::read(
auto sources_info = std::make_shared<HDFSSource::SourcesInfo>(); auto sources_info = std::make_shared<HDFSSource::SourcesInfo>();
sources_info->uris = LSWithRegexpMatching("/", fs, path_from_uri); sources_info->uris = LSWithRegexpMatching("/", fs, path_from_uri);
if (sources_info->uris.empty())
LOG_WARNING(log, "No file in HDFS matches the path: {}", uri);
for (const auto & column : column_names) for (const auto & column : column_names)
{ {
if (column == "_path") if (column == "_path")

View File

@ -42,7 +42,7 @@ protected:
const String & compression_method_); const String & compression_method_);
private: private:
String uri; const String uri;
String format_name; String format_name;
String compression_method; String compression_method;

View File

@ -201,6 +201,15 @@ def test_write_gzip_storage(started_cluster):
assert started_cluster.hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n" assert started_cluster.hdfs_api.read_gzip_data("/gzip_storage") == "1\tMark\t72.53\n"
assert node1.query("select * from GZIPHDFSStorage") == "1\tMark\t72.53\n" assert node1.query("select * from GZIPHDFSStorage") == "1\tMark\t72.53\n"
def test_read_files_with_spaces(started_cluster):
started_cluster.hdfs_api.write_data("/test test test 1.txt", "1\n")
started_cluster.hdfs_api.write_data("/test test test 2.txt", "2\n")
started_cluster.hdfs_api.write_data("/test test test 3.txt", "3\n")
node1.query("create table test (id UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/test*', 'TSV')")
assert node1.query("select * from test order by id") == "1\n2\n3\n"
if __name__ == '__main__': if __name__ == '__main__':
cluster.start() cluster.start()
input("Cluster created, press any key to destroy...") input("Cluster created, press any key to destroy...")