This commit is contained in:
kssenii 2021-04-19 19:43:22 +00:00
parent 14800f0c52
commit cc5f49fbee

View File

@ -22,33 +22,37 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl
static std::mutex hdfs_init_mutex;
std::string hdfs_uri;
std::string hdfs_file_path;
hdfsFile fin;
HDFSBuilderWrapper builder;
HDFSFSPtr fs;
ReadBufferFromHDFSImpl(const std::string & hdfs_name_,
explicit ReadBufferFromHDFSImpl(
const std::string & hdfs_uri_,
const std::string & hdfs_file_path_,
const Poco::Util::AbstractConfiguration & config_)
: hdfs_uri(hdfs_name_),
builder(createHDFSBuilder(hdfs_uri, config_))
: hdfs_uri(hdfs_uri_)
, hdfs_file_path(hdfs_file_path_)
, builder(createHDFSBuilder(hdfs_uri_, config_))
{
std::lock_guard lock(hdfs_init_mutex);
fs = createHDFSFS(builder.get());
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const std::string path = hdfs_uri.substr(begin_of_path);
fin = hdfsOpenFile(fs.get(), path.c_str(), O_RDONLY, 0, 0, 0);
fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0);
if (fin == nullptr)
throw Exception("Unable to open HDFS file: " + path + " error: " + std::string(hdfsGetLastError()),
ErrorCodes::CANNOT_OPEN_FILE);
throw Exception(ErrorCodes::CANNOT_OPEN_FILE,
"Unable to open HDFS file: {}. Error: {}",
hdfs_uri + hdfs_file_path, std::string(hdfsGetLastError()));
}
int read(char * start, size_t size) const
{
int bytes_read = hdfsRead(fs.get(), fin, start, size);
if (bytes_read < 0)
throw Exception("Fail to read HDFS file: " + hdfs_uri + " " + std::string(hdfsGetLastError()),
ErrorCodes::NETWORK_ERROR);
throw Exception(ErrorCodes::NETWORK_ERROR,
"Fail to read from HDFS: {}, file path: {}. Error: {}",
hdfs_uri, hdfs_file_path, std::string(hdfsGetLastError()));
return bytes_read;
}
@ -66,8 +70,12 @@ ReadBufferFromHDFS::ReadBufferFromHDFS(const std::string & hdfs_name_,
const Poco::Util::AbstractConfiguration & config_,
size_t buf_size_)
: BufferWithOwnMemory<ReadBuffer>(buf_size_)
, impl(std::make_unique<ReadBufferFromHDFSImpl>(hdfs_name_, config_))
{
const size_t begin_of_path = hdfs_name_.find('/', hdfs_name_.find("//") + 2);
const String hdfs_file_path = hdfs_name_.substr(begin_of_path);
const String hdfs_uri = hdfs_name_.substr(0, begin_of_path) + "/";
impl = std::make_unique<ReadBufferFromHDFSImpl>(hdfs_uri, hdfs_file_path, config_);
}