From 1b16db72c3a54dd74a985f35c24294f2aa4f661a Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Mon, 7 Feb 2022 17:20:26 +0300 Subject: [PATCH] fix consecutive backward seeks in seekable read buffers --- src/Compression/CachedCompressedReadBuffer.cpp | 2 +- src/Compression/CompressedReadBufferFromFile.cpp | 2 +- .../AsynchronousReadIndirectBufferFromRemoteFS.cpp | 2 +- src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp | 2 +- .../AsynchronousReadBufferFromFileDescriptor.cpp | 4 ++-- src/IO/BufferBase.h | 9 +++++++++ src/IO/ReadBufferFromEncryptedFile.cpp | 2 +- src/IO/ReadBufferFromFileDescriptor.cpp | 14 ++++---------- src/IO/ReadBufferFromFileDescriptor.h | 3 --- src/IO/ReadBufferFromS3.cpp | 2 +- src/IO/ReadWriteBufferFromHTTP.h | 2 +- src/Storages/HDFS/ReadBufferFromHDFS.cpp | 2 +- 12 files changed, 23 insertions(+), 23 deletions(-) diff --git a/src/Compression/CachedCompressedReadBuffer.cpp b/src/Compression/CachedCompressedReadBuffer.cpp index f942f81f5e9..bda86f8c616 100644 --- a/src/Compression/CachedCompressedReadBuffer.cpp +++ b/src/Compression/CachedCompressedReadBuffer.cpp @@ -105,7 +105,7 @@ void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t o /// We will discard our working_buffer, but have to account rest bytes bytes += offset(); /// No data, everything discarded - pos = working_buffer.end(); + resetWorkingBuffer(); owned_cell.reset(); /// Remember required offset in decompressed block which will be set in diff --git a/src/Compression/CompressedReadBufferFromFile.cpp b/src/Compression/CompressedReadBufferFromFile.cpp index b8ce485abc5..cf08d68a7aa 100644 --- a/src/Compression/CompressedReadBufferFromFile.cpp +++ b/src/Compression/CompressedReadBufferFromFile.cpp @@ -80,7 +80,7 @@ void CompressedReadBufferFromFile::seek(size_t offset_in_compressed_file, size_t /// We will discard our working_buffer, but have to account rest bytes bytes += offset(); /// No data, everything discarded - pos = working_buffer.end(); + resetWorkingBuffer(); size_compressed = 0; /// Remember required offset in decompressed block which will be set in /// the next ReadBuffer::next() call diff --git a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index c8484e6088d..184fcfe6f8c 100644 --- a/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -243,7 +243,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence prefetch_future = {}; } - pos = working_buffer.end(); + resetWorkingBuffer(); /** * Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer. diff --git a/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp index c21a55d68ac..cbf265ce741 100644 --- a/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp @@ -64,7 +64,7 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence) throw Exception("Only SEEK_SET or SEEK_CUR modes are allowed.", ErrorCodes::CANNOT_SEEK_THROUGH_FILE); impl->reset(); - pos = working_buffer.end(); + resetWorkingBuffer(); return impl->file_offset_of_buffer_end; } diff --git a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp index 9c92201b3a1..877702f9705 100644 --- a/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp +++ b/src/IO/AsynchronousReadBufferFromFileDescriptor.cpp @@ -181,8 +181,8 @@ off_t AsynchronousReadBufferFromFileDescriptor::seek(off_t offset, int whence) off_t offset_after_seek_pos = new_pos - seek_pos; - /// First put position at the end of the buffer so the next read will fetch new data to the buffer. - pos = working_buffer.end(); + /// First reset the buffer so the next read will fetch new data to the buffer. + resetWorkingBuffer(); /// Just update the info about the next position in file. diff --git a/src/IO/BufferBase.h b/src/IO/BufferBase.h index 198441d8bc1..df384a3f051 100644 --- a/src/IO/BufferBase.h +++ b/src/IO/BufferBase.h @@ -97,6 +97,15 @@ public: bool isPadded() const { return padded; } protected: + void resetWorkingBuffer() + { + /// Move position to the end of buffer to trigger call of 'next' on next reading. + /// Discard all data in current working buffer to prevent wrong assumtions on content + /// of buffer, e.g. for optimizations of seeks in seekable buffers. + working_buffer.resize(0); + pos = working_buffer.end(); + } + /// Read/write position. Position pos; diff --git a/src/IO/ReadBufferFromEncryptedFile.cpp b/src/IO/ReadBufferFromEncryptedFile.cpp index 445c55ac269..7aec6dcde02 100644 --- a/src/IO/ReadBufferFromEncryptedFile.cpp +++ b/src/IO/ReadBufferFromEncryptedFile.cpp @@ -56,7 +56,7 @@ off_t ReadBufferFromEncryptedFile::seek(off_t off, int whence) offset = new_pos; /// No more reading from the current working buffer until next() is called. - pos = working_buffer.end(); + resetWorkingBuffer(); assert(!hasPendingData()); } diff --git a/src/IO/ReadBufferFromFileDescriptor.cpp b/src/IO/ReadBufferFromFileDescriptor.cpp index a5e75ba5f83..d266fb86e0f 100644 --- a/src/IO/ReadBufferFromFileDescriptor.cpp +++ b/src/IO/ReadBufferFromFileDescriptor.cpp @@ -111,7 +111,6 @@ bool ReadBufferFromFileDescriptor::nextImpl() ProfileEvents::increment(ProfileEvents::ReadBufferFromFileDescriptorReadBytes, bytes_read); working_buffer = internal_buffer; working_buffer.resize(bytes_read); - buffer_is_dirty = false; } else return false; @@ -153,10 +152,10 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence) } /// Position is unchanged. - if (!buffer_is_dirty && (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end)) + if (new_pos + (working_buffer.end() - pos) == file_offset_of_buffer_end) return new_pos; - if (!buffer_is_dirty && file_offset_of_buffer_end - working_buffer.size() <= static_cast(new_pos) + if (file_offset_of_buffer_end - working_buffer.size() <= static_cast(new_pos) && new_pos <= file_offset_of_buffer_end) { /// Position is still inside the buffer. @@ -177,12 +176,8 @@ off_t ReadBufferFromFileDescriptor::seek(off_t offset, int whence) off_t offset_after_seek_pos = new_pos - seek_pos; - /// First put position at the end of the buffer so the next read will fetch new data to the buffer. - pos = working_buffer.end(); - - /// Mark buffer as dirty to disallow further seek optimizations, because fetching data to the buffer - /// is delayed to the next call of 'nextImpl', but it may be not called before next seek. - buffer_is_dirty = true; + /// First reset the buffer so the next read will fetch new data to the buffer. + resetWorkingBuffer(); /// In case of using 'pread' we just update the info about the next position in file. /// In case of using 'read' we call 'lseek'. @@ -234,7 +229,6 @@ void ReadBufferFromFileDescriptor::rewind() working_buffer.resize(0); pos = working_buffer.begin(); file_offset_of_buffer_end = 0; - buffer_is_dirty = true; } diff --git a/src/IO/ReadBufferFromFileDescriptor.h b/src/IO/ReadBufferFromFileDescriptor.h index 48acd5d323e..188cdd709b5 100644 --- a/src/IO/ReadBufferFromFileDescriptor.h +++ b/src/IO/ReadBufferFromFileDescriptor.h @@ -62,9 +62,6 @@ public: private: /// Assuming file descriptor supports 'select', check that we have data to read or wait until timeout. bool poll(size_t timeout_microseconds); - - /// If it's true then we cannot assume on content of buffer to optimize seek calls. - bool buffer_is_dirty = true; }; diff --git a/src/IO/ReadBufferFromS3.cpp b/src/IO/ReadBufferFromS3.cpp index f01640cb95b..869432b9484 100644 --- a/src/IO/ReadBufferFromS3.cpp +++ b/src/IO/ReadBufferFromS3.cpp @@ -187,7 +187,7 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence) } } - pos = working_buffer.end(); + resetWorkingBuffer(); if (impl) { ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection); diff --git a/src/IO/ReadWriteBufferFromHTTP.h b/src/IO/ReadWriteBufferFromHTTP.h index 0314fa33f11..ce4d83105c0 100644 --- a/src/IO/ReadWriteBufferFromHTTP.h +++ b/src/IO/ReadWriteBufferFromHTTP.h @@ -498,7 +498,7 @@ namespace detail impl.reset(); } - pos = working_buffer.end(); + resetWorkingBuffer(); read_range.begin = offset_; read_range.end = std::nullopt; offset_from_begin_pos = 0; diff --git a/src/Storages/HDFS/ReadBufferFromHDFS.cpp b/src/Storages/HDFS/ReadBufferFromHDFS.cpp index 15f6c73bc51..0ad55162fb2 100644 --- a/src/Storages/HDFS/ReadBufferFromHDFS.cpp +++ b/src/Storages/HDFS/ReadBufferFromHDFS.cpp @@ -173,7 +173,7 @@ off_t ReadBufferFromHDFS::seek(off_t offset_, int whence) return getPosition(); } - pos = working_buffer.end(); + resetWorkingBuffer(); impl->seek(offset_, whence); return impl->getPosition(); }