2019-01-17 14:10:30 +00:00
|
|
|
#include <IO/WriteBufferFromHDFS.h>
|
|
|
|
|
|
|
|
#if USE_HDFS
|
|
|
|
#include <Poco/URI.h>
|
|
|
|
#include <hdfs/hdfs.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int BAD_ARGUMENTS;
|
|
|
|
extern const int NETWORK_ERROR;
|
|
|
|
extern const int CANNOT_FSYNC;
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
|
|
|
|
{
|
2019-01-18 12:48:55 +00:00
|
|
|
struct HDFSBuilderDeleter
|
|
|
|
{
|
2019-01-18 16:17:24 +00:00
|
|
|
void operator()(hdfsBuilder * builder_ptr)
|
2019-01-18 12:48:55 +00:00
|
|
|
{
|
2019-01-18 16:17:24 +00:00
|
|
|
hdfsFreeBuilder(builder_ptr);
|
2019-01-18 12:48:55 +00:00
|
|
|
}
|
|
|
|
};
|
|
|
|
|
2019-01-17 14:10:30 +00:00
|
|
|
std::string hdfs_uri;
|
2019-01-18 12:48:55 +00:00
|
|
|
std::unique_ptr<hdfsBuilder, HDFSBuilderDeleter> builder;
|
2019-01-17 14:10:30 +00:00
|
|
|
hdfsFS fs;
|
|
|
|
hdfsFile fout;
|
|
|
|
|
|
|
|
WriteBufferFromHDFSImpl(const std::string & hdfs_name_)
|
2019-01-18 11:39:36 +00:00
|
|
|
: hdfs_uri(hdfs_name_)
|
2019-01-18 12:48:55 +00:00
|
|
|
, builder(hdfsNewBuilder())
|
2019-01-17 14:10:30 +00:00
|
|
|
{
|
|
|
|
Poco::URI uri(hdfs_name_);
|
|
|
|
auto & host = uri.getHost();
|
|
|
|
auto port = uri.getPort();
|
|
|
|
auto & path = uri.getPath();
|
|
|
|
if (host.empty() || port == 0 || path.empty())
|
|
|
|
{
|
|
|
|
throw Exception("Illegal HDFS URI: " + hdfs_uri, ErrorCodes::BAD_ARGUMENTS);
|
|
|
|
}
|
2019-01-18 10:55:03 +00:00
|
|
|
|
2019-01-17 14:10:30 +00:00
|
|
|
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
|
|
|
|
/// TODO Allow to tune from query Settings.
|
2019-01-18 12:48:55 +00:00
|
|
|
hdfsBuilderConfSetStr(builder.get(), "input.read.timeout", "60000"); // 1 min
|
|
|
|
hdfsBuilderConfSetStr(builder.get(), "input.write.timeout", "60000"); // 1 min
|
|
|
|
hdfsBuilderConfSetStr(builder.get(), "input.connect.timeout", "60000"); // 1 min
|
2019-01-17 14:10:30 +00:00
|
|
|
|
2019-01-18 12:48:55 +00:00
|
|
|
hdfsBuilderSetNameNode(builder.get(), host.c_str());
|
|
|
|
hdfsBuilderSetNameNodePort(builder.get(), port);
|
|
|
|
fs = hdfsBuilderConnect(builder.get());
|
2019-01-17 14:10:30 +00:00
|
|
|
|
|
|
|
if (fs == nullptr)
|
|
|
|
{
|
2019-01-18 12:48:55 +00:00
|
|
|
throw Exception("Unable to connect to HDFS: " + std::string(hdfsGetLastError()),
|
|
|
|
ErrorCodes::NETWORK_ERROR);
|
2019-01-17 14:10:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
fout = hdfsOpenFile(fs, path.c_str(), O_WRONLY, 0, 0, 0);
|
2019-01-18 18:57:11 +00:00
|
|
|
if (fout == nullptr)
|
|
|
|
{
|
|
|
|
throw Exception("Unable to open HDFS file: " + path + " error: " + std::string(hdfsGetLastError()),
|
|
|
|
ErrorCodes::NETWORK_ERROR);
|
|
|
|
}
|
|
|
|
|
2019-01-17 14:10:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
~WriteBufferFromHDFSImpl()
|
|
|
|
{
|
|
|
|
hdfsCloseFile(fs, fout);
|
2019-01-18 10:55:03 +00:00
|
|
|
hdfsDisconnect(fs);
|
2019-01-17 14:10:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
int write(const char * start, size_t size)
|
|
|
|
{
|
|
|
|
int bytes_written = hdfsWrite(fs, fout, start, size);
|
|
|
|
if (bytes_written < 0)
|
|
|
|
throw Exception("Fail to write HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()),
|
|
|
|
ErrorCodes::NETWORK_ERROR);
|
|
|
|
return bytes_written;
|
|
|
|
}
|
|
|
|
|
|
|
|
void sync()
|
|
|
|
{
|
|
|
|
int result = hdfsSync(fs, fout);
|
|
|
|
if (result < 0)
|
|
|
|
throwFromErrno("Cannot HDFS sync" + hdfs_uri + " " + std::string(hdfsGetLastError()),
|
|
|
|
ErrorCodes::CANNOT_FSYNC);
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size)
|
|
|
|
: BufferWithOwnMemory<WriteBuffer>(buf_size)
|
|
|
|
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_))
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void WriteBufferFromHDFS::nextImpl()
|
|
|
|
{
|
|
|
|
if (!offset())
|
|
|
|
return;
|
|
|
|
|
|
|
|
size_t bytes_written = 0;
|
|
|
|
|
|
|
|
while (bytes_written != offset())
|
|
|
|
bytes_written += impl->write(working_buffer.begin() + bytes_written, offset() - bytes_written);
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void WriteBufferFromHDFS::sync()
|
|
|
|
{
|
|
|
|
impl->sync();
|
|
|
|
}
|
|
|
|
|
|
|
|
WriteBufferFromHDFS::~WriteBufferFromHDFS()
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
#endif
|