diff --git a/src/Disks/DiskWebServer.cpp b/src/Disks/DiskWebServer.cpp index 4c98df561f1..d6fae0aa7dc 100644 --- a/src/Disks/DiskWebServer.cpp +++ b/src/Disks/DiskWebServer.cpp @@ -173,8 +173,7 @@ std::unique_ptr DiskWebServer::readFile(const String & p if (threadpool_read) { auto reader = IDiskRemote::getThreadPoolReader(); - auto buf = std::make_unique(reader, read_settings.priority, std::move(web_impl)); - return std::make_unique(std::move(buf), min_bytes_for_seek); + return std::make_unique(reader, read_settings.priority, std::move(web_impl)); } else { diff --git a/src/Disks/ReadBufferFromRemoteFSGather.cpp b/src/Disks/ReadBufferFromRemoteFSGather.cpp index 5038698d1e7..4a886ff37a7 100644 --- a/src/Disks/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/ReadBufferFromRemoteFSGather.cpp @@ -55,18 +55,28 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(const RemoteMetadata } -size_t ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset) +size_t ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore) { + /** + * Set `data` to current working and internal buffers. + * Internal buffer with size `size`. Working buffer with size 0. + */ set(data, size); + absolute_position = offset; + bytes_to_ignore = ignore; + auto result = nextImpl(); + bytes_to_ignore = 0; + if (result) return working_buffer.size(); + return 0; } -SeekableReadBufferPtr ReadBufferFromRemoteFSGather::initialize() +void ReadBufferFromRemoteFSGather::initialize() { /// One clickhouse file can be split into multiple files in remote fs. auto current_buf_offset = absolute_position; @@ -77,14 +87,20 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::initialize() if (size > current_buf_offset) { - auto buf = createImplementationBuffer(file_path); - buf->seek(current_buf_offset, SEEK_SET); - return buf; + /// Do not create a new buffer if we already have what we need. + if (!current_buf || buf_idx != i) + { + current_buf = createImplementationBuffer(file_path); + buf_idx = i; + } + + current_buf->seek(current_buf_offset, SEEK_SET); + return; } current_buf_offset -= size; } - return nullptr; + current_buf = nullptr; } @@ -92,7 +108,7 @@ bool ReadBufferFromRemoteFSGather::nextImpl() { /// Find first available buffer that fits to given offset. if (!current_buf) - current_buf = initialize(); + initialize(); /// If current buffer has remaining data - use it. if (current_buf) @@ -119,7 +135,17 @@ bool ReadBufferFromRemoteFSGather::nextImpl() bool ReadBufferFromRemoteFSGather::readImpl() { swap(*current_buf); + + /** + * Lazy seek is performed here. + * In asynchronous buffer when seeking to offset in range [pos, pos + min_bytes_for_seek] + * we save how many bytes need to be ignored (new_offset - position() bytes). + */ + if (bytes_to_ignore) + current_buf->ignore(bytes_to_ignore); + auto result = current_buf->next(); + swap(*current_buf); if (result) @@ -129,6 +155,13 @@ bool ReadBufferFromRemoteFSGather::readImpl() } +void ReadBufferFromRemoteFSGather::seek(off_t offset) +{ + absolute_position = offset; + initialize(); +} + + void ReadBufferFromRemoteFSGather::reset() { current_buf.reset(); diff --git a/src/Disks/ReadBufferFromRemoteFSGather.h b/src/Disks/ReadBufferFromRemoteFSGather.h index 28ea347352a..7a858b0e126 100644 --- a/src/Disks/ReadBufferFromRemoteFSGather.h +++ b/src/Disks/ReadBufferFromRemoteFSGather.h @@ -13,9 +13,7 @@ namespace Aws namespace S3 { class S3Client; -} -} - +}} namespace DB { @@ -31,8 +29,10 @@ public: void reset(); + void seek(off_t offset); /// SEEK_SET only. + protected: - size_t readInto(char * data, size_t size, size_t offset); + size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0); virtual SeekableReadBufferPtr createImplementationBuffer(const String & path) const = 0; @@ -41,7 +41,7 @@ protected: private: bool nextImpl() override; - SeekableReadBufferPtr initialize(); + void initialize(); bool readImpl(); @@ -50,6 +50,10 @@ private: size_t current_buf_idx = 0; size_t absolute_position = 0; + + size_t buf_idx = 0; + + size_t bytes_to_ignore = 0; }; diff --git a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp index 0327223dbb9..c69eb893663 100644 --- a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp +++ b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.cpp @@ -31,13 +31,17 @@ namespace ErrorCodes AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRemoteFS( - AsynchronousReaderPtr reader_, Int32 priority_, - std::shared_ptr impl_, size_t buf_size_) + AsynchronousReaderPtr reader_, + Int32 priority_, + std::shared_ptr impl_, + size_t buf_size_, + size_t min_bytes_for_seek_) : ReadBufferFromFileBase(buf_size_, nullptr, 0) , reader(reader_) , priority(priority_) , impl(impl_) , prefetch_buffer(buf_size_) + , min_bytes_for_seek(min_bytes_for_seek_) { } @@ -50,12 +54,21 @@ std::future AsynchronousReadIndirectBufferFromRemot request.size = size; request.offset = absolute_position; request.priority = priority; + + if (bytes_to_ignore) + { + request.ignore = bytes_to_ignore; + bytes_to_ignore = 0; + } return reader->submit(request); } void AsynchronousReadIndirectBufferFromRemoteFS::prefetch() { + if (hasPendingData()) + return; + if (prefetch_future.valid()) return; @@ -156,7 +169,19 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence } pos = working_buffer.end(); - impl->reset(); + + if (static_cast(absolute_position) >= getPosition() + && static_cast(absolute_position) < getPosition() + static_cast(min_bytes_for_seek)) + { + /** + * Lazy ignore. Save number of bytes to ignore and ignore it either for prefetch buffer or current buffer. + */ + bytes_to_ignore = absolute_position - getPosition(); + } + else + { + impl->seek(absolute_position); /// SEEK_SET. + } return absolute_position; } diff --git a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h index ed94f72fd69..3c600562252 100644 --- a/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h +++ b/src/IO/AsynchronousReadIndirectBufferFromRemoteFS.h @@ -34,7 +34,8 @@ public: explicit AsynchronousReadIndirectBufferFromRemoteFS( AsynchronousReaderPtr reader_, Int32 priority_, std::shared_ptr impl_, - size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE); + size_t buf_size_ = DBMS_DEFAULT_BUFFER_SIZE, + size_t min_bytes_for_seek = 1024 * 1024); ~AsynchronousReadIndirectBufferFromRemoteFS() override; @@ -66,6 +67,9 @@ private: Memory<> prefetch_buffer; String buffer_events; + + size_t min_bytes_for_seek; + size_t bytes_to_ignore = 0; }; } diff --git a/src/IO/AsynchronousReader.h b/src/IO/AsynchronousReader.h index 77b4a2f5b22..e4a81623205 100644 --- a/src/IO/AsynchronousReader.h +++ b/src/IO/AsynchronousReader.h @@ -46,6 +46,7 @@ public: size_t size = 0; char * buf = nullptr; int64_t priority = 0; + size_t ignore = 0; }; /// Less than requested amount of data can be returned. diff --git a/src/IO/ThreadPoolRemoteFSReader.cpp b/src/IO/ThreadPoolRemoteFSReader.cpp index ce750f1898a..258d20d62e9 100644 --- a/src/IO/ThreadPoolRemoteFSReader.cpp +++ b/src/IO/ThreadPoolRemoteFSReader.cpp @@ -28,9 +28,9 @@ namespace CurrentMetrics namespace DB { -size_t ThreadPoolRemoteFSReader::RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset) +size_t ThreadPoolRemoteFSReader::RemoteFSFileDescriptor::readInto(char * data, size_t size, size_t offset, size_t ignore) { - return reader->readInto(data, size, offset); + return reader->readInto(data, size, offset, ignore); } @@ -49,7 +49,7 @@ std::future ThreadPoolRemoteFSReader::submit(Reques auto * remote_fs_fd = assert_cast(request.descriptor.get()); Stopwatch watch(CLOCK_MONOTONIC); - auto bytes_read = remote_fs_fd->readInto(request.buf, request.size, request.offset); + auto bytes_read = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore); watch.stop(); ProfileEvents::increment(ProfileEvents::RemoteFSReadMicroseconds, watch.elapsedMicroseconds()); diff --git a/src/IO/ThreadPoolRemoteFSReader.h b/src/IO/ThreadPoolRemoteFSReader.h index a2d9cbbf779..c300162e214 100644 --- a/src/IO/ThreadPoolRemoteFSReader.h +++ b/src/IO/ThreadPoolRemoteFSReader.h @@ -30,7 +30,7 @@ struct ThreadPoolRemoteFSReader::RemoteFSFileDescriptor : public IFileDescriptor public: RemoteFSFileDescriptor(std::shared_ptr reader_) : reader(reader_) {} - size_t readInto(char * data, size_t size, size_t offset); + size_t readInto(char * data, size_t size, size_t offset, size_t ignore = 0); private: std::shared_ptr reader;