From ab0c601d6364b6fc94dc08a36e9d148d84d25d5d Mon Sep 17 00:00:00 2001 From: Michael Kolupaev Date: Fri, 22 Mar 2024 22:40:38 +0000 Subject: [PATCH] Make userspace page cache work with 'web' disks --- src/Disks/IO/ReadBufferFromWebServer.cpp | 31 +++++++++++++------ src/Disks/IO/ReadBufferFromWebServer.h | 5 +-- .../ObjectStorages/Web/WebObjectStorage.cpp | 4 ++- 3 files changed, 28 insertions(+), 12 deletions(-) diff --git a/src/Disks/IO/ReadBufferFromWebServer.cpp b/src/Disks/IO/ReadBufferFromWebServer.cpp index 03300cc0714..7c5de8a13de 100644 --- a/src/Disks/IO/ReadBufferFromWebServer.cpp +++ b/src/Disks/IO/ReadBufferFromWebServer.cpp @@ -21,10 +21,11 @@ namespace ErrorCodes ReadBufferFromWebServer::ReadBufferFromWebServer( const String & url_, ContextPtr context_, + size_t file_size_, const ReadSettings & settings_, bool use_external_buffer_, size_t read_until_position_) - : ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0) + : ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0, file_size_) , log(getLogger("ReadBufferFromWebServer")) , context(context_) , url(url_) @@ -36,7 +37,7 @@ ReadBufferFromWebServer::ReadBufferFromWebServer( } -std::unique_ptr ReadBufferFromWebServer::initialize() +std::unique_ptr ReadBufferFromWebServer::initialize() { Poco::URI uri(url); if (read_until_position) @@ -119,9 +120,8 @@ bool ReadBufferFromWebServer::nextImpl() auto result = impl->next(); - BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset()); - - chassert(working_buffer.begin() == impl->buffer().begin()); + working_buffer = impl->buffer(); + pos = impl->position(); if (result) offset += working_buffer.size(); @@ -132,16 +132,29 @@ bool ReadBufferFromWebServer::nextImpl() off_t ReadBufferFromWebServer::seek(off_t offset_, int whence) { - if (impl) - throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Seek is allowed only before first read attempt from the buffer"); - if (whence != SEEK_SET) throw Exception(ErrorCodes::CANNOT_SEEK_THROUGH_FILE, "Only SEEK_SET mode is allowed"); if (offset_ < 0) throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek position is out of bounds. Offset: {}", offset_); - offset = offset_; + if (impl) + { + if (use_external_buffer) + { + impl->set(internal_buffer.begin(), internal_buffer.size()); + } + + impl->seek(offset_, SEEK_SET); + + working_buffer = impl->buffer(); + pos = impl->position(); + offset = offset_ + available(); + } + else + { + offset = offset_; + } return offset; } diff --git a/src/Disks/IO/ReadBufferFromWebServer.h b/src/Disks/IO/ReadBufferFromWebServer.h index 68ad752bbdb..e4f436f2eb3 100644 --- a/src/Disks/IO/ReadBufferFromWebServer.h +++ b/src/Disks/IO/ReadBufferFromWebServer.h @@ -20,6 +20,7 @@ public: explicit ReadBufferFromWebServer( const String & url_, ContextPtr context_, + size_t file_size_, const ReadSettings & settings_ = {}, bool use_external_buffer_ = false, size_t read_until_position = 0); @@ -39,7 +40,7 @@ public: bool supportsRightBoundedReads() const override { return true; } private: - std::unique_ptr initialize(); + std::unique_ptr initialize(); LoggerPtr log; ContextPtr context; @@ -47,7 +48,7 @@ private: const String url; size_t buf_size; - std::unique_ptr impl; + std::unique_ptr impl; ReadSettings read_settings; diff --git a/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp b/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp index 7e942a6cf6f..54a501724f9 100644 --- a/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Web/WebObjectStorage.cpp @@ -250,13 +250,15 @@ std::unique_ptr WebObjectStorage::readObject( /// NOLINT std::optional, std::optional) const { + size_t object_size = object.bytes_size; auto read_buffer_creator = - [this, read_settings] + [this, read_settings, object_size] (bool /* restricted_seek */, const std::string & path_) -> std::unique_ptr { return std::make_unique( fs::path(url) / path_, getContext(), + object_size, read_settings, /* use_external_buffer */true); };