2019-01-19 20:17:19 +00:00
|
|
|
#include <IO/HDFSCommon.h>
|
|
|
|
|
|
|
|
#if USE_HDFS
|
|
|
|
#include <Common/Exception.h>
|
2019-07-09 13:16:04 +00:00
|
|
|
|
2019-01-19 20:17:19 +00:00
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int BAD_ARGUMENTS;
|
|
|
|
extern const int NETWORK_ERROR;
|
|
|
|
}
|
2019-07-09 17:35:47 +00:00
|
|
|
|
2019-01-19 20:17:19 +00:00
|
|
|
HDFSBuilderPtr createHDFSBuilder(const Poco::URI & uri)
|
|
|
|
{
|
|
|
|
auto & host = uri.getHost();
|
|
|
|
auto port = uri.getPort();
|
|
|
|
auto & path = uri.getPath();
|
2019-07-18 23:32:59 +00:00
|
|
|
if (host.empty() || path.empty())
|
2019-01-19 20:17:19 +00:00
|
|
|
throw Exception("Illegal HDFS URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
|
|
|
|
|
|
|
|
HDFSBuilderPtr builder(hdfsNewBuilder());
|
|
|
|
if (builder == nullptr)
|
|
|
|
throw Exception("Unable to create builder to connect to HDFS: " + uri.toString() + " " + std::string(hdfsGetLastError()),
|
|
|
|
ErrorCodes::NETWORK_ERROR);
|
|
|
|
hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
|
|
|
|
hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
|
|
|
|
hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
|
|
|
|
|
2019-07-09 17:35:47 +00:00
|
|
|
std::string user_info = uri.getUserInfo();
|
|
|
|
if (!user_info.empty() && user_info.front() != ':')
|
2019-07-09 13:16:04 +00:00
|
|
|
{
|
2019-07-09 14:09:56 +00:00
|
|
|
std::string user;
|
2019-07-09 17:35:47 +00:00
|
|
|
size_t delim_pos = user_info.find(":");
|
|
|
|
if (delim_pos != std::string::npos)
|
|
|
|
user = user_info.substr(0, delim_pos);
|
2019-07-09 14:09:56 +00:00
|
|
|
else
|
2019-07-09 17:35:47 +00:00
|
|
|
user = user_info;
|
|
|
|
|
2019-07-09 14:09:56 +00:00
|
|
|
hdfsBuilderSetUserName(builder.get(), user.c_str());
|
2019-07-09 10:26:06 +00:00
|
|
|
}
|
2019-01-19 20:17:19 +00:00
|
|
|
hdfsBuilderSetNameNode(builder.get(), host.c_str());
|
|
|
|
hdfsBuilderSetNameNodePort(builder.get(), port);
|
|
|
|
return builder;
|
|
|
|
}
|
|
|
|
|
|
|
|
HDFSFSPtr createHDFSFS(hdfsBuilder * builder)
|
|
|
|
{
|
|
|
|
HDFSFSPtr fs(hdfsBuilderConnect(builder));
|
|
|
|
if (fs == nullptr)
|
|
|
|
throw Exception("Unable to connect to HDFS: " + std::string(hdfsGetLastError()),
|
|
|
|
ErrorCodes::NETWORK_ERROR);
|
|
|
|
|
|
|
|
return fs;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
#endif
|