diff --git a/src/Disks/IO/CacheableReadBufferFromRemoteFS.cpp b/src/Disks/IO/CacheableReadBufferFromRemoteFS.cpp index 9c5df639edf..7bbbcf599f8 100644 --- a/src/Disks/IO/CacheableReadBufferFromRemoteFS.cpp +++ b/src/Disks/IO/CacheableReadBufferFromRemoteFS.cpp @@ -1,5 +1,7 @@ #include "CacheableReadBufferFromRemoteFS.h" #include +#include +#include namespace ProfileEvents { @@ -24,12 +26,13 @@ CacheableReadBufferFromRemoteFS::CacheableReadBufferFromRemoteFS( const ReadSettings & settings_, size_t read_until_position_) : SeekableReadBuffer(nullptr, 0) - , log(&Poco::Logger::get("CacheableReadBufferFromRemoteFS" + path_ + "")) + , log(&Poco::Logger::get("CacheableReadBufferFromRemoteFS(" + path_ + ")")) , key(cache_->hash(path_)) , cache(cache_) , reader(reader_) , settings(settings_) , read_until_position(read_until_position_) + , path(path_) { } @@ -40,10 +43,6 @@ void CacheableReadBufferFromRemoteFS::initialize(size_t offset, size_t size) /** * Segments in returned list are ordered in ascending order and represent a full contiguous * interval (no holes). Each segment in returned list has state: DOWNLOADED, DOWNLOADING or EMPTY. - * DOWNLOADING means that either the segment is being downloaded by some other thread or that it - * is going to be downloaded by the caller (just space reservation happened). - * EMPTY means that the segment not in cache, not being downloaded and cannot be downloaded - * by the caller (because of not enough space or max elements limit reached). E.g. returned list is never empty. */ if (file_segments_holder->file_segments.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "List of file segments cannot be empty"); @@ -56,7 +55,7 @@ void CacheableReadBufferFromRemoteFS::initialize(size_t offset, size_t size) SeekableReadBufferPtr CacheableReadBufferFromRemoteFS::createCacheReadBuffer(size_t offset) const { - return createReadBufferFromFileBase(cache->path(key, offset), settings); + return std::make_shared(cache->path(key, offset), settings.local_fs_buffer_size); } SeekableReadBufferPtr CacheableReadBufferFromRemoteFS::createReadBuffer(FileSegmentPtr file_segment) @@ -129,7 +128,6 @@ SeekableReadBufferPtr CacheableReadBufferFromRemoteFS::createReadBuffer(FileSegm /// TODO: Add seek avoiding for s3 on the lowest level. implementation_buffer->setReadUntilPosition(range.right + 1); /// [..., range.right] - implementation_buffer->seek(range.left, SEEK_SET); return implementation_buffer; } @@ -156,6 +154,8 @@ bool CacheableReadBufferFromRemoteFS::nextImpl() if (current_file_segment_it == file_segments_holder->file_segments.end()) return false; + bool new_impl = false; + if (impl) { auto current_read_range = (*current_file_segment_it)->range(); @@ -169,11 +169,13 @@ bool CacheableReadBufferFromRemoteFS::nextImpl() return false; impl = createReadBuffer(*current_file_segment_it); + new_impl = true; } } else { impl = createReadBuffer(*current_file_segment_it); + new_impl = true; } auto current_read_range = (*current_file_segment_it)->range(); @@ -184,6 +186,12 @@ bool CacheableReadBufferFromRemoteFS::nextImpl() swap(*impl); + if (new_impl) + { + LOG_TEST(log, "SEEK TO {}", file_offset_of_buffer_end); + impl->seek(file_offset_of_buffer_end, SEEK_SET); + } + bool result; auto & file_segment = *current_file_segment_it; @@ -251,6 +259,7 @@ bool CacheableReadBufferFromRemoteFS::nextImpl() LOG_TEST(log, "Returning with {} bytes, last range: {}, current offset: {}", working_buffer.size(), current_read_range.toString(), file_offset_of_buffer_end); + return result; }