Merge pull request #50063 from KevinyhZou/minor_improve_hdfs_read_buffer

Minor improve HDFS ReadBuffer for read end of file
This commit is contained in:
Kseniia Sumarokova 2023-08-03 17:55:48 +02:00 committed by GitHub
commit 68b48a0bc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -42,8 +42,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
off_t file_offset = 0;
off_t read_until_position = 0;
std::optional<size_t> file_size;
off_t file_size;
explicit ReadBufferFromHDFSImpl(
const std::string & hdfs_uri_,
@ -59,7 +58,6 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
, builder(createHDFSBuilder(hdfs_uri_, config_))
, read_settings(read_settings_)
, read_until_position(read_until_position_)
, file_size(file_size_)
{
fs = createHDFSFS(builder.get());
fin = hdfsOpenFile(fs.get(), hdfs_file_path.c_str(), O_RDONLY, 0, 0, 0);
@ -68,6 +66,22 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
throw Exception(ErrorCodes::CANNOT_OPEN_FILE,
"Unable to open HDFS file: {}. Error: {}",
hdfs_uri + hdfs_file_path, std::string(hdfsGetLastError()));
if (file_size_.has_value())
{
file_size = file_size_.value();
}
else
{
auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
if (!file_info)
{
hdfsCloseFile(fs.get(), fin);
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path);
}
file_size = static_cast<size_t>(file_info->mSize);
hdfsFreeFileInfo(file_info, 1);
}
}
~ReadBufferFromHDFSImpl() override
@ -75,16 +89,9 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
hdfsCloseFile(fs.get(), fin);
}
size_t getFileSize()
size_t getFileSize() const
{
if (file_size)
return *file_size;
auto * file_info = hdfsGetPathInfo(fs.get(), hdfs_file_path.c_str());
if (!file_info)
throw Exception(ErrorCodes::UNKNOWN_FILE_SIZE, "Cannot find out file size for: {}", hdfs_file_path);
file_size = static_cast<size_t>(file_info->mSize);
return *file_size;
return file_size;
}
bool nextImpl() override
@ -104,6 +111,10 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
{
num_bytes_to_read = internal_buffer.size();
}
if (file_size != 0 && file_offset >= file_size)
{
return false;
}
ResourceGuard rlock(read_settings.resource_link, num_bytes_to_read);
int bytes_read;