ClickHouse/dbms/src/IO/ReadBufferFromHDFS.h

88 lines
2.8 KiB
C++
Raw Normal View History

2018-11-19 08:17:09 +00:00
#pragma once
#include <IO/ReadBuffer.h>
#include <Poco/URI.h>
#include <hdfs/hdfs.h>
#include <IO/BufferWithOwnMemory.h>
#ifndef O_DIRECT
#define O_DIRECT 00040000
#endif
namespace DB
{
/** Accepts path to file and opens it, or pre-opened file descriptor.
* Closes file by himself (thus "owns" a file descriptor).
*/
class ReadBufferFromHDFS : public BufferWithOwnMemory<ReadBuffer>
{
protected:
std::string hdfs_uri;
// std::unique_ptr<struct hdfsBuilder> builder;
struct hdfsBuilder *builder;
hdfsFS fs;
hdfsFile fin;
public:
ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: BufferWithOwnMemory<ReadBuffer>(buf_size), hdfs_uri(hdfs_name_) , builder(hdfsNewBuilder())
{
Poco::URI uri(hdfs_name_);
auto& host = uri.getHost();
auto port = uri.getPort();
auto& path = uri.getPath();
if (host.empty() || port == 0 || path.empty())
{
throw Exception("Illegal HDFS URI : " + hdfs_uri);
}
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
hdfsBuilderConfSetStr(builder, "input.read.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder, "input.connect.timeout", "60000"); // 1 min
hdfsBuilderSetNameNode(builder, host.c_str());
hdfsBuilderSetNameNodePort(builder, port);
fs = hdfsBuilderConnect(builder);
if (fs == nullptr)
{
throw Exception("Unable to connect to HDFS:" + String(hdfsGetLastError()));
}
fin = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0);
}
ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default;
~ReadBufferFromHDFS() override
{
close();
hdfsFreeBuilder(builder);
}
/// Close HDFS connection before destruction of object.
void close()
{
hdfsCloseFile(fs, fin);
}
bool nextImpl() override
{
int done = hdfsRead(fs, fin, internal_buffer.begin(), internal_buffer.size());
if (done <0)
{
throw Exception("Fail to read HDFS file: " + hdfs_uri + " " + String(hdfsGetLastError()));
}
if (done)
working_buffer.resize(done);
else
return false;
return true;
}
const std::string& getHDFSUri() const
{
return hdfs_uri;
}
};
}