ClickHouse/dbms/src/IO/ReadBufferFromHDFS.h

87 lines
2.9 KiB
C++
Raw Normal View History

2018-11-19 08:17:09 +00:00
#pragma once
#include <IO/ReadBuffer.h>
#include <Poco/URI.h>
#include <hdfs/hdfs.h>
#include <IO/BufferWithOwnMemory.h>
#ifndef O_DIRECT
#define O_DIRECT 00040000
#endif
namespace DB
{
/** Accepts path to file and opens it, or pre-opened file descriptor.
* Closes file by himself (thus "owns" a file descriptor).
*/
class ReadBufferFromHDFS : public BufferWithOwnMemory<ReadBuffer>
{
protected:
std::string hdfs_uri;
struct hdfsBuilder *builder;
hdfsFS fs;
hdfsFile fin;
public:
ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE)
: BufferWithOwnMemory<ReadBuffer>(buf_size), hdfs_uri(hdfs_name_) , builder(hdfsNewBuilder())
{
Poco::URI uri(hdfs_name_);
2018-11-20 18:31:21 +00:00
auto & host = uri.getHost();
2018-11-19 08:17:09 +00:00
auto port = uri.getPort();
2018-11-20 18:31:21 +00:00
auto & path = uri.getPath();
2018-11-19 08:17:09 +00:00
if (host.empty() || port == 0 || path.empty())
{
2018-11-20 18:31:21 +00:00
throw Exception("Illegal HDFS URI: " + hdfs_uri);
2018-11-19 08:17:09 +00:00
}
// set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large
2018-11-20 18:31:21 +00:00
/// TODO Allow to tune from query Settings.
2018-11-19 08:17:09 +00:00
hdfsBuilderConfSetStr(builder, "input.read.timeout", "60000"); // 1 min
hdfsBuilderConfSetStr(builder, "input.connect.timeout", "60000"); // 1 min
hdfsBuilderSetNameNode(builder, host.c_str());
hdfsBuilderSetNameNodePort(builder, port);
fs = hdfsBuilderConnect(builder);
if (fs == nullptr)
{
throw Exception("Unable to connect to HDFS:" + String(hdfsGetLastError()));
}
fin = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0);
}
ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default;
~ReadBufferFromHDFS() override
{
close();
hdfsFreeBuilder(builder);
}
/// Close HDFS connection before destruction of object.
void close()
{
hdfsCloseFile(fs, fin);
}
bool nextImpl() override
{
2018-11-20 18:31:21 +00:00
int bytes_read = hdfsRead(fs, fin, internal_buffer.begin(), internal_buffer.size());
if (bytes_read < 0)
2018-11-19 08:17:09 +00:00
{
throw Exception("Fail to read HDFS file: " + hdfs_uri + " " + String(hdfsGetLastError()));
}
2018-11-20 18:31:21 +00:00
if (bytes_read)
working_buffer.resize(bytes_read);
2018-11-19 08:17:09 +00:00
else
return false;
return true;
}
2018-11-20 18:31:21 +00:00
const std::string & getHDFSUri() const
2018-11-19 08:17:09 +00:00
{
return hdfs_uri;
}
};
}