diff --git a/src/Common/FileCache.cpp b/src/Common/FileCache.cpp index b2dfc416e01..8cf0cfa85fb 100644 --- a/src/Common/FileCache.cpp +++ b/src/Common/FileCache.cpp @@ -249,6 +249,7 @@ FileSegmentsHolder LRUFileCache::getOrSet(const Key & key, size_t offset, size_t } } + assert(!file_segments.empty()); return FileSegmentsHolder(std::move(file_segments)); } @@ -303,7 +304,7 @@ LRUFileCache::FileSegmentCell * LRUFileCache::setImpl( if (!size) return nullptr; - LOG_TEST(log, "Set. Key: {}, offset: {}, size: {}", keyToStr(key), offset, size); + LOG_TEST(log, "SetImpl. Key: {}, offset: {}, size: {}", keyToStr(key), offset, size); switch (state) { @@ -715,7 +716,7 @@ FileSegment::State FileSegment::wait() { LOG_TEST(&Poco::Logger::get("kssenii"), "Waiting on: {}", range().toString()); - cv.wait_for(segment_lock, std::chrono::seconds(60)); + cv.wait_for(segment_lock, std::chrono::seconds(60)); /// TODO: use value defined by setting break; } case State::DOWNLOADED:[[fallthrough]]; diff --git a/src/Common/FileCache.h b/src/Common/FileCache.h index 303ccbcf02e..624f339327a 100644 --- a/src/Common/FileCache.h +++ b/src/Common/FileCache.h @@ -182,6 +182,9 @@ struct FileSegmentsHolder : boost::noncopyable ~FileSegmentsHolder() { + /// CacheableReadBufferFromRemoteFS removes completed file segments from FileSegmentsHolder, so + /// in destruction here remain only uncompleted file segments. + for (auto & segment : file_segments) { /// In general file segment is completed by downloader by calling segment->complete() diff --git a/src/Disks/IO/CacheableReadBufferFromRemoteFS.cpp b/src/Disks/IO/CacheableReadBufferFromRemoteFS.cpp index 7bbbcf599f8..fcc9fb1446e 100644 --- a/src/Disks/IO/CacheableReadBufferFromRemoteFS.cpp +++ b/src/Disks/IO/CacheableReadBufferFromRemoteFS.cpp @@ -1,7 +1,9 @@ #include "CacheableReadBufferFromRemoteFS.h" + +#include #include #include -#include + namespace ProfileEvents { @@ -32,7 +34,6 @@ CacheableReadBufferFromRemoteFS::CacheableReadBufferFromRemoteFS( , reader(reader_) , settings(settings_) , read_until_position(read_until_position_) - , path(path_) { } @@ -132,18 +133,29 @@ SeekableReadBufferPtr CacheableReadBufferFromRemoteFS::createReadBuffer(FileSegm return implementation_buffer; } -void CacheableReadBufferFromRemoteFS::completeFileSegmentAndGetNext() +bool CacheableReadBufferFromRemoteFS::completeFileSegmentAndGetNext() { + LOG_TEST(log, "Completed segment: {}", (*current_file_segment_it)->range().toString()); + auto file_segment_it = current_file_segment_it++; + auto range = (*file_segment_it)->range(); assert(file_offset_of_buffer_end > range.right); if (download_current_segment) - (*current_file_segment_it)->complete(); + (*file_segment_it)->complete(); /// Do not hold pointer to file segment if it is not needed anymore /// so can become releasable and can be evicted from cache. file_segments_holder->file_segments.erase(file_segment_it); + + if (current_file_segment_it == file_segments_holder->file_segments.end()) + return false; + + impl = createReadBuffer(*current_file_segment_it); + + LOG_TEST(log, "New segment: {}", (*current_file_segment_it)->range().toString()); + return true; } bool CacheableReadBufferFromRemoteFS::nextImpl() @@ -154,8 +166,6 @@ bool CacheableReadBufferFromRemoteFS::nextImpl() if (current_file_segment_it == file_segments_holder->file_segments.end()) return false; - bool new_impl = false; - if (impl) { auto current_read_range = (*current_file_segment_it)->range(); @@ -163,19 +173,16 @@ bool CacheableReadBufferFromRemoteFS::nextImpl() if (file_offset_of_buffer_end > current_read_range.right) { - completeFileSegmentAndGetNext(); - - if (current_file_segment_it == file_segments_holder->file_segments.end()) + if (!completeFileSegmentAndGetNext()) return false; - - impl = createReadBuffer(*current_file_segment_it); - new_impl = true; } } else { impl = createReadBuffer(*current_file_segment_it); - new_impl = true; + + /// Seek is required only for first file segment in the list of file segments. + impl->seek(file_offset_of_buffer_end, SEEK_SET); } auto current_read_range = (*current_file_segment_it)->range(); @@ -183,15 +190,10 @@ bool CacheableReadBufferFromRemoteFS::nextImpl() assert(current_read_range.left <= file_offset_of_buffer_end); assert(current_read_range.right >= file_offset_of_buffer_end); + assert(!internal_buffer.empty()); swap(*impl); - if (new_impl) - { - LOG_TEST(log, "SEEK TO {}", file_offset_of_buffer_end); - impl->seek(file_offset_of_buffer_end, SEEK_SET); - } - bool result; auto & file_segment = *current_file_segment_it; @@ -217,7 +219,7 @@ bool CacheableReadBufferFromRemoteFS::nextImpl() file_segment->complete(); /// Note: If exception happens in another place -- out of scope of this buffer, then - /// downloader's FileSegmentsHolder is responsible to set ERROR state and call notify. + /// downloader's FileSegmentsHolder is responsible to call file_segment->complete(). /// (download_path (if exists) is removed from inside cache) throw; @@ -257,8 +259,8 @@ bool CacheableReadBufferFromRemoteFS::nextImpl() if (file_offset_of_buffer_end > current_read_range.right) completeFileSegmentAndGetNext(); - LOG_TEST(log, "Returning with {} bytes, last range: {}, current offset: {}", - working_buffer.size(), current_read_range.toString(), file_offset_of_buffer_end); + LOG_TEST(log, "Key: {}. Returning with {} bytes, current range: {}, current offset: {}", + getHexUIntLowercase(key), working_buffer.size(), current_read_range.toString(), file_offset_of_buffer_end); return result; } @@ -298,14 +300,4 @@ off_t CacheableReadBufferFromRemoteFS::getPosition() return file_offset_of_buffer_end - available(); } -CacheableReadBufferFromRemoteFS::~CacheableReadBufferFromRemoteFS() -{ - std::optional range; - if (download_current_segment - && current_file_segment_it != file_segments_holder->file_segments.end()) - range = (*current_file_segment_it)->range(); - LOG_TEST(log, "Buffer reset. Current offset: {}, last download range: {}, state: {}", - file_offset_of_buffer_end, range ? range->toString() : "None", (*current_file_segment_it)->state()); -} - } diff --git a/src/Disks/IO/CacheableReadBufferFromRemoteFS.h b/src/Disks/IO/CacheableReadBufferFromRemoteFS.h index c3525f15015..6f0b9e6d432 100644 --- a/src/Disks/IO/CacheableReadBufferFromRemoteFS.h +++ b/src/Disks/IO/CacheableReadBufferFromRemoteFS.h @@ -25,8 +25,6 @@ public: off_t getPosition() override; - ~CacheableReadBufferFromRemoteFS() override; - private: void initialize(size_t offset, size_t size); @@ -34,7 +32,7 @@ private: SeekableReadBufferPtr createReadBuffer(FileSegmentPtr file_segment); size_t getTotalSizeToRead(); - void completeFileSegmentAndGetNext(); + bool completeFileSegmentAndGetNext(); Poco::Logger * log; FileCache::Key key; diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp index 738f93e2967..315772eedf6 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.cpp @@ -27,8 +27,9 @@ namespace DB { #if USE_AWS_S3 -SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path) const +SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const String & path) { + current_path = path; bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool; auto reader = std::make_unique( @@ -49,8 +50,9 @@ SeekableReadBufferPtr ReadBufferFromS3Gather::createImplementationBuffer(const S #if USE_AZURE_BLOB_STORAGE -SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementationBuffer(const String & path) const +SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementationBuffer(const String & path) { + current_path = path; bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool; return std::make_unique(blob_container_client, path, max_single_read_retries, max_single_download_retries, settings.remote_fs_buffer_size, use_external_buffer, read_until_position); @@ -58,15 +60,16 @@ SeekableReadBufferPtr ReadBufferFromAzureBlobStorageGather::createImplementation #endif -SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path) const +SeekableReadBufferPtr ReadBufferFromWebServerGather::createImplementationBuffer(const String & path) { + current_path = path; bool use_external_buffer = settings.remote_fs_method == RemoteFSReadMethod::threadpool; return std::make_unique(fs::path(uri) / path, context, settings, use_external_buffer, read_until_position); } #if USE_HDFS -SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path) const +SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBuffer(const String & path) { return std::make_unique(hdfs_uri, fs::path(hdfs_directory) / path, config, buf_size); } @@ -135,6 +138,8 @@ void ReadBufferFromRemoteFSGather::initialize() bool ReadBufferFromRemoteFSGather::nextImpl() { + assert(!internal_buffer.empty()); + /// Find first available buffer that fits to given offset. if (!current_buf) initialize(); @@ -210,7 +215,7 @@ void ReadBufferFromRemoteFSGather::reset() String ReadBufferFromRemoteFSGather::getFileName() const { - return canonical_path; + return current_path; } diff --git a/src/Disks/IO/ReadBufferFromRemoteFSGather.h b/src/Disks/IO/ReadBufferFromRemoteFSGather.h index d5c5911b05c..d14592330cd 100644 --- a/src/Disks/IO/ReadBufferFromRemoteFSGather.h +++ b/src/Disks/IO/ReadBufferFromRemoteFSGather.h @@ -52,12 +52,14 @@ public: bool initialized() const { return current_buf != nullptr; } protected: - virtual SeekableReadBufferPtr createImplementationBuffer(const String & path) const = 0; + virtual SeekableReadBufferPtr createImplementationBuffer(const String & path) = 0; RemoteMetadata metadata; size_t read_until_position = 0; + String current_path; + private: bool nextImpl() override; @@ -102,7 +104,7 @@ public: { } - SeekableReadBufferPtr createImplementationBuffer(const String & path) const override; + SeekableReadBufferPtr createImplementationBuffer(const String & path) override; private: std::shared_ptr client_ptr; @@ -133,7 +135,7 @@ public: { } - SeekableReadBufferPtr createImplementationBuffer(const String & path) const override; + SeekableReadBufferPtr createImplementationBuffer(const String & path) override; private: std::shared_ptr blob_container_client; @@ -160,7 +162,7 @@ public: { } - SeekableReadBufferPtr createImplementationBuffer(const String & path) const override; + SeekableReadBufferPtr createImplementationBuffer(const String & path) override; private: String uri; @@ -189,7 +191,7 @@ public: hdfs_uri = hdfs_uri_.substr(0, begin_of_path); } - SeekableReadBufferPtr createImplementationBuffer(const String & path) const override; + SeekableReadBufferPtr createImplementationBuffer(const String & path) override; private: const Poco::Util::AbstractConfiguration & config; diff --git a/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp index c21a55d68ac..064168553a8 100644 --- a/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/ReadIndirectBufferFromRemoteFS.cpp @@ -13,7 +13,9 @@ namespace ErrorCodes ReadIndirectBufferFromRemoteFS::ReadIndirectBufferFromRemoteFS( - std::shared_ptr impl_) : impl(std::move(impl_)) + std::shared_ptr impl_) + : ReadBufferFromFileBase(DBMS_DEFAULT_BUFFER_SIZE, nullptr, 0) + , impl(std::move(impl_)) { } @@ -72,6 +74,8 @@ off_t ReadIndirectBufferFromRemoteFS::seek(off_t offset_, int whence) bool ReadIndirectBufferFromRemoteFS::nextImpl() { + assert(!internal_buffer.empty()); + /// Transfer current position and working_buffer to actual ReadBuffer swap(*impl); /// Position and working_buffer will be updated in next() call diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index ca15b0b3d86..050f1a34965 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -238,7 +238,6 @@ std::unique_ptr DiskS3::readFile(const String & path, co } else { - /// TODO: Pass cache for non-asynchronous reader too. auto buf = std::make_unique(std::move(s3_impl)); return std::make_unique(std::move(buf), settings->min_bytes_for_seek); }