2019-01-17 11:26:29 +00:00
|
|
|
#include <IO/ReadBufferFromHDFS.h>
|
|
|
|
|
|
|
|
#if USE_HDFS
|
|
|
|
#include <Poco/URI.h>
|
2019-01-19 20:17:19 +00:00
|
|
|
#include <IO/HDFSCommon.h>
|
2019-01-17 11:26:29 +00:00
|
|
|
#include <hdfs/hdfs.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
|
|
|
extern const int NETWORK_ERROR;
|
2019-01-19 20:17:19 +00:00
|
|
|
extern const int CANNOT_OPEN_FILE;
|
2019-01-17 11:26:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
|
|
|
|
{
|
2019-01-19 20:17:19 +00:00
|
|
|
Poco::URI hdfs_uri;
|
2019-01-17 11:26:29 +00:00
|
|
|
hdfsFile fin;
|
2019-01-19 20:17:19 +00:00
|
|
|
HDFSBuilderPtr builder;
|
|
|
|
HDFSFSPtr fs;
|
2019-01-19 23:51:03 +00:00
|
|
|
|
2019-01-17 11:26:29 +00:00
|
|
|
ReadBufferFromHDFSImpl(const std::string & hdfs_name_)
|
|
|
|
: hdfs_uri(hdfs_name_)
|
2019-01-19 20:17:19 +00:00
|
|
|
, builder(createHDFSBuilder(hdfs_uri))
|
|
|
|
, fs(createHDFSFS(builder.get()))
|
2019-01-17 11:26:29 +00:00
|
|
|
{
|
|
|
|
|
2019-01-19 20:17:19 +00:00
|
|
|
auto & path = hdfs_uri.getPath();
|
|
|
|
fin = hdfsOpenFile(fs.get(), path.c_str(), O_RDONLY, 0, 0, 0);
|
2019-01-18 18:57:11 +00:00
|
|
|
|
|
|
|
if (fin == nullptr)
|
|
|
|
throw Exception("Unable to open HDFS file: " + path + " error: " + std::string(hdfsGetLastError()),
|
2019-01-19 20:17:19 +00:00
|
|
|
ErrorCodes::CANNOT_OPEN_FILE);
|
2019-01-17 11:26:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
int read(char * start, size_t size)
|
|
|
|
{
|
2019-01-19 20:17:19 +00:00
|
|
|
int bytes_read = hdfsRead(fs.get(), fin, start, size);
|
2019-01-17 11:26:29 +00:00
|
|
|
if (bytes_read < 0)
|
2019-01-19 20:17:19 +00:00
|
|
|
throw Exception("Fail to read HDFS file: " + hdfs_uri.toString() + " " + std::string(hdfsGetLastError()),
|
2019-01-17 14:10:30 +00:00
|
|
|
ErrorCodes::NETWORK_ERROR);
|
2019-01-17 11:26:29 +00:00
|
|
|
return bytes_read;
|
|
|
|
}
|
2019-01-19 20:17:19 +00:00
|
|
|
|
|
|
|
~ReadBufferFromHDFSImpl()
|
|
|
|
{
|
|
|
|
hdfsCloseFile(fs.get(), fin);
|
|
|
|
}
|
2019-01-17 11:26:29 +00:00
|
|
|
};
|
|
|
|
|
|
|
|
ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size)
|
|
|
|
: BufferWithOwnMemory<ReadBuffer>(buf_size)
|
|
|
|
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_))
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
bool ReadBufferFromHDFS::nextImpl()
|
|
|
|
{
|
|
|
|
int bytes_read = impl->read(internal_buffer.begin(), internal_buffer.size());
|
|
|
|
|
|
|
|
if (bytes_read)
|
|
|
|
working_buffer.resize(bytes_read);
|
|
|
|
else
|
|
|
|
return false;
|
|
|
|
return true;
|
|
|
|
}
|
|
|
|
|
|
|
|
ReadBufferFromHDFS::~ReadBufferFromHDFS()
|
|
|
|
{
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
#endif
|