From 2d3e08fc748456ee987d9eddf09d036b8f524a98 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 17 Jan 2019 14:26:29 +0300 Subject: [PATCH] Refactor ReadBufferFromHDFS --- dbms/CMakeLists.txt | 4 +- dbms/src/IO/ReadBufferFromHDFS.cpp | 104 +++++++++++++++++++++++++++++ dbms/src/IO/ReadBufferFromHDFS.h | 96 +++++--------------------- 3 files changed, 121 insertions(+), 83 deletions(-) create mode 100644 dbms/src/IO/ReadBufferFromHDFS.cpp diff --git a/dbms/CMakeLists.txt b/dbms/CMakeLists.txt index 84099810164..4463d571386 100644 --- a/dbms/CMakeLists.txt +++ b/dbms/CMakeLists.txt @@ -299,8 +299,8 @@ target_include_directories (dbms SYSTEM BEFORE PRIVATE ${DIVIDE_INCLUDE_DIR}) target_include_directories (dbms SYSTEM BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR}) if (USE_HDFS) - target_link_libraries (dbms PRIVATE ${HDFS3_LIBRARY}) - target_include_directories (dbms SYSTEM BEFORE PRIVATE ${HDFS3_INCLUDE_DIR}) + target_link_libraries (clickhouse_common_io PRIVATE ${HDFS3_LIBRARY}) + target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${HDFS3_INCLUDE_DIR}) endif() if (USE_JEMALLOC) diff --git a/dbms/src/IO/ReadBufferFromHDFS.cpp b/dbms/src/IO/ReadBufferFromHDFS.cpp new file mode 100644 index 00000000000..c4b9a2e5d4f --- /dev/null +++ b/dbms/src/IO/ReadBufferFromHDFS.cpp @@ -0,0 +1,104 @@ +#include + +#if USE_HDFS +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; + extern const int NETWORK_ERROR; +} + +struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl +{ + std::string hdfs_uri; + struct hdfsBuilder * builder; + hdfsFS fs; + hdfsFile fin; + + ReadBufferFromHDFSImpl(const std::string & hdfs_name_) + : hdfs_uri(hdfs_name_) + , builder(hdfsNewBuilder()) + { + builder = hdfsNewBuilder(); + hdfs_uri = hdfs_name_; + Poco::URI uri(hdfs_name_); + auto & host = uri.getHost(); + auto port = uri.getPort(); + auto & path = uri.getPath(); + if (host.empty() || port == 0 || path.empty()) + { + throw Exception("Illegal HDFS URI: " + hdfs_uri, ErrorCodes::BAD_ARGUMENTS); + } + // set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large + /// TODO Allow to tune from query Settings. + hdfsBuilderConfSetStr(builder, "input.read.timeout", "60000"); // 1 min + hdfsBuilderConfSetStr(builder, "input.connect.timeout", "60000"); // 1 min + + hdfsBuilderSetNameNode(builder, host.c_str()); + hdfsBuilderSetNameNodePort(builder, port); + fs = hdfsBuilderConnect(builder); + + if (fs == nullptr) + { + throw Exception("Unable to connect to HDFS: " + std::string(hdfsGetLastError()), ErrorCodes::NETWORK_ERROR); + } + + fin = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0); + } + + ~ReadBufferFromHDFSImpl() + { + close(); + hdfsFreeBuilder(builder); + } + + void close() + { + hdfsCloseFile(fs, fin); + } + + int read(char * start, size_t size) + { + int bytes_read = hdfsRead(fs, fin, start, size); + if (bytes_read < 0) + { + throw Exception("Fail to read HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()), ErrorCodes::NETWORK_ERROR); + } + return bytes_read; + } +}; + +ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size) + : BufferWithOwnMemory(buf_size) + , impl(std::make_unique(hdfs_name_)) +{ +} + + +bool ReadBufferFromHDFS::nextImpl() +{ + int bytes_read = impl->read(internal_buffer.begin(), internal_buffer.size()); + + if (bytes_read) + working_buffer.resize(bytes_read); + else + return false; + return true; +} + +const std::string & ReadBufferFromHDFS::getHDFSUri() const +{ + return impl->hdfs_uri; +} + +ReadBufferFromHDFS::~ReadBufferFromHDFS() +{ +} + +} + +#endif diff --git a/dbms/src/IO/ReadBufferFromHDFS.h b/dbms/src/IO/ReadBufferFromHDFS.h index a97ad5ece2f..7e0656098de 100644 --- a/dbms/src/IO/ReadBufferFromHDFS.h +++ b/dbms/src/IO/ReadBufferFromHDFS.h @@ -4,94 +4,28 @@ #if USE_HDFS #include -#include -#include #include #include - -#ifndef O_DIRECT -#define O_DIRECT 00040000 -#endif +#include namespace DB { - namespace ErrorCodes - { - extern const int BAD_ARGUMENTS; - extern const int NETWORK_ERROR; - } - /** Accepts path to file and opens it, or pre-opened file descriptor. - * Closes file by himself (thus "owns" a file descriptor). - */ - class ReadBufferFromHDFS : public BufferWithOwnMemory - { - protected: - std::string hdfs_uri; - struct hdfsBuilder *builder; - hdfsFS fs; - hdfsFile fin; - public: - ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE) - : BufferWithOwnMemory(buf_size), hdfs_uri(hdfs_name_) , builder(hdfsNewBuilder()) - { - Poco::URI uri(hdfs_name_); - auto & host = uri.getHost(); - auto port = uri.getPort(); - auto & path = uri.getPath(); - if (host.empty() || port == 0 || path.empty()) - { - throw Exception("Illegal HDFS URI: " + hdfs_uri, ErrorCodes::BAD_ARGUMENTS); - } - // set read/connect timeout, default value in libhdfs3 is about 1 hour, and too large - /// TODO Allow to tune from query Settings. - hdfsBuilderConfSetStr(builder, "input.read.timeout", "60000"); // 1 min - hdfsBuilderConfSetStr(builder, "input.connect.timeout", "60000"); // 1 min +/** Accepts path to file and opens it, or pre-opened file descriptor. + * Closes file by himself (thus "owns" a file descriptor). + */ +class ReadBufferFromHDFS : public BufferWithOwnMemory +{ + struct ReadBufferFromHDFSImpl; + std::unique_ptr impl; +public: + ReadBufferFromHDFS(const std::string & hdfs_name_, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE); + ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default; - hdfsBuilderSetNameNode(builder, host.c_str()); - hdfsBuilderSetNameNodePort(builder, port); - fs = hdfsBuilderConnect(builder); + bool nextImpl() override; - if (fs == nullptr) - { - throw Exception("Unable to connect to HDFS: " + std::string(hdfsGetLastError()), ErrorCodes::NETWORK_ERROR); - } + ~ReadBufferFromHDFS() override; - fin = hdfsOpenFile(fs, path.c_str(), O_RDONLY, 0, 0, 0); - } - - ReadBufferFromHDFS(ReadBufferFromHDFS &&) = default; - - ~ReadBufferFromHDFS() override - { - close(); - hdfsFreeBuilder(builder); - } - - /// Close HDFS connection before destruction of object. - void close() - { - hdfsCloseFile(fs, fin); - } - - bool nextImpl() override - { - int bytes_read = hdfsRead(fs, fin, internal_buffer.begin(), internal_buffer.size()); - if (bytes_read < 0) - { - throw Exception("Fail to read HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()), ErrorCodes::NETWORK_ERROR); - } - - if (bytes_read) - working_buffer.resize(bytes_read); - else - return false; - return true; - } - - const std::string & getHDFSUri() const - { - return hdfs_uri; - } - }; + const std::string & getHDFSUri() const; +}; } #endif