ClickHouse/dbms/src/IO/HDFSCommon.cpp
Weiqing Xu ec3c5a369d support reading from HA mode HDSF
the driver libhdfs3 can support HA mode. When the uri doesn't contain
port num, the libhdfs3 will handle it in HA mode, treat the uri as
nameservice name and read the real host and port from configuration file
as the origin Java client.
the default configuration file is hdfs-client.xml in the working
directory, it also can be set in env variable "LIBHDFS3_CONF".
the format of the configuration file is same with hdfs-site.xml.
2019-07-19 07:32:59 +08:00

58 lines
1.7 KiB
C++

#include <IO/HDFSCommon.h>
#if USE_HDFS
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int NETWORK_ERROR;
}
HDFSBuilderPtr createHDFSBuilder(const Poco::URI & uri)
{
auto & host = uri.getHost();
auto port = uri.getPort();
auto & path = uri.getPath();
if (host.empty() || path.empty())
throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
HDFSBuilderPtr builder(hdfsNewBuilder());
if (builder == nullptr)
throw Exception("Unable to create builder to connect to HDFS: " + uri.toString() + " " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
std::string user_info = uri.getUserInfo();
if (!user_info.empty() && user_info.front() != ':')
{
std::string user;
size_t delim_pos = user_info.find(":");
if (delim_pos != std::string::npos)
user = user_info.substr(0, delim_pos);
else
user = user_info;
hdfsBuilderSetUserName(builder.get(), user.c_str());
}
hdfsBuilderSetNameNode(builder.get(), host.c_str());
hdfsBuilderSetNameNodePort(builder.get(), port);
return builder;
}
HDFSFSPtr createHDFSFS(hdfsBuilder * builder)
{
HDFSFSPtr fs(hdfsBuilderConnect(builder));
if (fs == nullptr)
throw Exception("Unable to connect to HDFS: " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
return fs;
}
}
#endif