fix bug of read buffer from hdfs

This commit is contained in:
taiyang-li 2022-04-12 12:01:51 +08:00
parent a840edbefa
commit 9d31c44d39

View File

@ -30,7 +30,7 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
HDFSBuilderWrapper builder;
HDFSFSPtr fs;
off_t offset = 0;
off_t file_offset = 0;
off_t read_until_position = 0;
explicit ReadBufferFromHDFSImpl(
@ -71,13 +71,13 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
size_t num_bytes_to_read;
if (read_until_position)
{
if (read_until_position == offset)
if (read_until_position == file_offset)
return false;
if (read_until_position < offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", offset, read_until_position - 1);
if (read_until_position < file_offset)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read beyond right offset ({} > {})", file_offset, read_until_position - 1);
num_bytes_to_read = read_until_position - offset;
num_bytes_to_read = read_until_position - file_offset;
}
else
{
@ -94,28 +94,28 @@ struct ReadBufferFromHDFS::ReadBufferFromHDFSImpl : public BufferWithOwnMemory<S
{
working_buffer = internal_buffer;
working_buffer.resize(bytes_read);
offset += bytes_read;
file_offset += bytes_read;
return true;
}
return false;
}
off_t seek(off_t offset_, int whence) override
off_t seek(off_t file_offset_, int whence) override
{
if (whence != SEEK_SET)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Only SEEK_SET is supported");
offset = offset_;
int seek_status = hdfsSeek(fs.get(), fin, offset);
file_offset = file_offset_;
int seek_status = hdfsSeek(fs.get(), fin, file_offset);
if (seek_status != 0)
throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Fail to seek HDFS file: {}, error: {}", hdfs_uri, std::string(hdfsGetLastError()));
return offset;
return file_offset;
}
off_t getPosition() override
{
return offset;
return file_offset;
}
};
@ -140,7 +140,7 @@ bool ReadBufferFromHDFS::nextImpl()
auto result = impl->next();
if (result)
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset); /// use the buffer returned by `impl`
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); /// use the buffer returned by `impl`
return result;
}