Merge pull request #36153 from bigo-sg/fix_hdfs_read_buffer

Fix bug of read buffer from hdfs
This commit is contained in:
Kseniia Sumarokova 2022-04-12 08:53:39 +02:00 committed by GitHub
commit 9d88f84180
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -30,7 +30,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
HDFSBuilderWrapper builder;
HDFSFSPtr fs;
off_t offset = 0;
off_t file_offset = 0;
off_t read_until_position = 0;
explicit ReadBufferFromHDFSImpl(
@ -71,13 +71,13 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
size_t num_bytes_to_read;
if (read_until_position)
{
if (read_until_position == offset)
if (read_until_position == file_offset)
return false;
if (read_until_position < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
if (read_until_position < file_offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", file_offset, read_until_position - 1);
num_bytes_to_read = read_until_position - offset;
num_bytes_to_read = read_until_position - file_offset;
}
else
{
@ -94,28 +94,28 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
{
working_buffer = internal_buffer;
working_buffer.resize(bytes_read);
offset += bytes_read;
file_offset += bytes_read;
return true;
}
return false;
}
off_t seek(off_t offset_, int whence) override
off_t seek(off_t file_offset_, int whence) override
{
if (whence != SEEK_SET)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Only SEEK_SET is supported");
offset = offset_;
int seek_status = hdfsSeek(fs.get(), fin, offset);
file_offset = file_offset_;
int seek_status = hdfsSeek(fs.get(), fin, file_offset);
if (seek_status != 0)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Fail to seek HDFS file: {}, error: {}", hdfs_uri, std::string(hdfsGetLastError()));
return offset;
return file_offset;
}
off_t getPosition() override
{
return offset;
return file_offset;
}
};
@ -140,7 +140,7 @@ bool ReadBufferFromHDFS::nextImpl()
auto result = impl->next();
if (result)
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset); /// use the buffer returned by `impl`
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); /// use the buffer returned by `impl`
return result;
}