ClickHouse/dbms/src/IO/WriteBufferFromHDFS.cpp

113 lines
2.8 KiB
C++
Raw Normal View History

2019-02-10 17:40:52 +00:00
#include <Common/config.h>
#if USE_HDFS
2019-02-10 17:40:52 +00:00
#include <IO/WriteBufferFromHDFS.h>
2019-01-19 20:17:19 +00:00
#include <IO/HDFSCommon.h>
#include <hdfs/hdfs.h>
2019-02-10 17:40:52 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int NETWORK_ERROR;
2019-01-19 20:17:19 +00:00
extern const int CANNOT_OPEN_FILE;
extern const int CANNOT_FSYNC;
2019-09-20 11:26:00 +00:00
extern const int BAD_ARGUMENTS;
}
struct WriteBufferFromHDFS::WriteBufferFromHDFSImpl
{
2019-09-05 14:42:17 +00:00
std::string hdfs_uri;
hdfsFile fout;
2019-01-19 20:17:19 +00:00
HDFSBuilderPtr builder;
HDFSFSPtr fs;
WriteBufferFromHDFSImpl(const std::string & hdfs_name_)
2019-01-18 11:39:36 +00:00
: hdfs_uri(hdfs_name_)
2019-01-19 20:17:19 +00:00
, builder(createHDFSBuilder(hdfs_uri))
, fs(createHDFSFS(builder.get()))
{
2019-09-05 14:42:17 +00:00
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const std::string path = hdfs_uri.substr(begin_of_path);
2019-09-20 11:26:00 +00:00
if (path.find_first_of("*?{") != std::string::npos)
2019-09-05 14:42:17 +00:00
throw Exception("URI '" + hdfs_uri + "' contains globs, so the table is in readonly mode", ErrorCodes::CANNOT_OPEN_FILE);
2019-09-20 11:26:00 +00:00
if (!hdfsExists(fs.get(), path.c_str()))
throw Exception("File: " + path + " is already exists", ErrorCodes::BAD_ARGUMENTS);
fout = hdfsOpenFile(fs.get(), path.c_str(), O_WRONLY, 0, 0, 0); /// O_WRONLY meaning create or overwrite i.e., implies O_TRUNCAT here
2019-01-18 18:57:11 +00:00
if (fout == nullptr)
{
throw Exception("Unable to open HDFS file: " + path + " error: " + std::string(hdfsGetLastError()),
2019-01-19 20:17:19 +00:00
ErrorCodes::CANNOT_OPEN_FILE);
2019-01-18 18:57:11 +00:00
}
}
~WriteBufferFromHDFSImpl()
{
2019-01-19 20:17:19 +00:00
hdfsCloseFile(fs.get(), fout);
}
int write(const char * start, size_t size)
{
2019-01-19 20:17:19 +00:00
int bytes_written = hdfsWrite(fs.get(), fout, start, size);
if (bytes_written < 0)
2019-09-05 14:42:17 +00:00
throw Exception("Fail to write HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
return bytes_written;
}
void sync()
{
2019-01-19 20:17:19 +00:00
int result = hdfsSync(fs.get(), fout);
if (result < 0)
2019-09-05 14:42:17 +00:00
throwFromErrno("Cannot HDFS sync" + hdfs_uri + " " + std::string(hdfsGetLastError()),
ErrorCodes::CANNOT_FSYNC);
}
};
WriteBufferFromHDFS::WriteBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size)
: BufferWithOwnMemory<WriteBuffer>(buf_size)
, impl(std::make_unique<WriteBufferFromHDFSImpl>(hdfs_name_))
{
}
void WriteBufferFromHDFS::nextImpl()
{
if (!offset())
return;
size_t bytes_written = 0;
while (bytes_written != offset())
bytes_written += impl->write(working_buffer.begin() + bytes_written, offset() - bytes_written);
}
void WriteBufferFromHDFS::sync()
{
impl->sync();
}
WriteBufferFromHDFS::~WriteBufferFromHDFS()
{
try
{
next();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
#endif