fix consecutive backward seeks in seekable read buffers

This commit is contained in:
Anton Popov 2022-02-07 17:20:26 +03:00
parent ae1fc94fb5
commit 1b16db72c3
12 changed files with 23 additions and 23 deletions

View File

@ -105,7 +105,7 @@ void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t o
/// We will discard our working_buffer, but have to account rest bytes
bytes += offset();
/// No data, everything discarded
pos = working_buffer.end();
resetWorkingBuffer();
owned_cell.reset();
/// Remember required offset in decompressed block which will be set in

View File

@ -80,7 +80,7 @@ void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t
/// We will discard our working_buffer, but have to account rest bytes
bytes += offset();
/// No data, everything discarded
pos = working_buffer.end();
resetWorkingBuffer();
size_compressed = 0;
/// Remember required offset in decompressed block which will be set in
/// the next ReadBuffer::next() call

View File

@ -243,7 +243,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence
prefetch_future = {};
}
pos = working_buffer.end();
resetWorkingBuffer();
/**
* Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer.

View File

@ -64,7 +64,7 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence)
throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE);
impl->reset();
pos = working_buffer.end();
resetWorkingBuffer();
return impl->file_offset_of_buffer_end;
}

View File

@ -181,8 +181,8 @@ off_t AsynchronousReadBufferFromFileDescriptor::seek(off_t offset, int whence)
off_t offset_after_seek_pos = new_pos - seek_pos;
/// First put position at the end of the buffer so the next read will fetch new data to the buffer.
pos = working_buffer.end();
/// First reset the buffer so the next read will fetch new data to the buffer.
resetWorkingBuffer();
/// Just update the info about the next position in file.

View File

@ -97,6 +97,15 @@ public:
bool isPadded() const { return padded; }
protected:
void resetWorkingBuffer()
{
/// Move position to the end of buffer to trigger call of 'next' on next reading.
/// Discard all data in current working buffer to prevent wrong assumtions on content
/// of buffer, e.g. for optimizations of seeks in seekable buffers.
working_buffer.resize(0);
pos = working_buffer.end();
}
/// Read/write position.
Position pos;

View File

@ -56,7 +56,7 @@ off_t ReadBufferFromEncryptedFile::seek(off_t off, int whence)
offset = new_pos;
/// No more reading from the current working buffer until next() is called.
pos = working_buffer.end();
resetWorkingBuffer();
assert(!hasPendingData());
}

View File

@ -111,7 +111,6 @@ bool ReadBufferFromFileDescriptor::nextImpl()
ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read);
working_buffer = internal_buffer;
working_buffer.resize(bytes_read);
buffer_is_dirty = false;
}
else
return false;
@ -153,10 +152,10 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
}
/// Position is unchanged.
if (!buffer_is_dirty && (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end))
if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)
return new_pos;
if (!buffer_is_dirty && file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos)
if (file_offset_of_buffer_end - working_buffer.size() <= static_cast<size_t>(new_pos)
&& new_pos <= file_offset_of_buffer_end)
{
/// Position is still inside the buffer.
@ -177,12 +176,8 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence)
off_t offset_after_seek_pos = new_pos - seek_pos;
/// First put position at the end of the buffer so the next read will fetch new data to the buffer.
pos = working_buffer.end();
/// Mark buffer as dirty to disallow further seek optimizations, because fetching data to the buffer
/// is delayed to the next call of 'nextImpl', but it may be not called before next seek.
buffer_is_dirty = true;
/// First reset the buffer so the next read will fetch new data to the buffer.
resetWorkingBuffer();
/// In case of using 'pread' we just update the info about the next position in file.
/// In case of using 'read' we call 'lseek'.
@ -234,7 +229,6 @@ void ReadBufferFromFileDescriptor::rewind()
working_buffer.resize(0);
pos = working_buffer.begin();
file_offset_of_buffer_end = 0;
buffer_is_dirty = true;
}

View File

@ -62,9 +62,6 @@ public:
private:
/// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout.
bool poll(size_t timeout_microseconds);
/// If it's true then we cannot assume on content of buffer to optimize seek calls.
bool buffer_is_dirty = true;
};

View File

@ -187,7 +187,7 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
}
}
pos = working_buffer.end();
resetWorkingBuffer();
if (impl)
{
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);

View File

@ -498,7 +498,7 @@ namespace detail
impl.reset();
}
pos = working_buffer.end();
resetWorkingBuffer();
read_range.begin = offset_;
read_range.end = std::nullopt;
offset_from_begin_pos = 0;

View File

@ -173,7 +173,7 @@ off_t ReadBufferFromHDFS::seek(off_t offset_, int whence)
return getPosition();
}
pos = working_buffer.end();
resetWorkingBuffer();
impl->seek(offset_, whence);
return impl->getPosition();
}