From 886b300b8d3bcea8cf2a3fa77e4a407105bdad0a Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 15 Feb 2022 11:27:44 +0100 Subject: [PATCH] Less seeks --- src/Common/FileSegment.cpp | 10 +- src/Common/FileSegment.h | 2 + src/Disks/IO/CachedReadBufferFromRemoteFS.cpp | 112 ++++++++++-------- src/Disks/IO/CachedReadBufferFromRemoteFS.h | 2 + 4 files changed, 72 insertions(+), 54 deletions(-) diff --git a/src/Common/FileSegment.cpp b/src/Common/FileSegment.cpp index 0f3e8ed0515..a9ea90c9f8f 100644 --- a/src/Common/FileSegment.cpp +++ b/src/Common/FileSegment.cpp @@ -274,7 +274,7 @@ void FileSegment::complete() { std::lock_guard segment_lock(mutex); - if (download_state == State::SKIP_CACHE) + if (download_state == State::SKIP_CACHE || detached) return; if (downloaded_size == range().size() && download_state != State::DOWNLOADED) @@ -295,13 +295,13 @@ void FileSegment::complete() void FileSegment::completeImpl(std::lock_guard & /* segment_lock */) { + std::lock_guard cache_lock(cache->mutex); + bool download_can_continue = false; if (download_state == State::PARTIALLY_DOWNLOADED || download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION) { - std::lock_guard cache_lock(cache->mutex); - bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock); download_can_continue = !is_last_holder && download_state == State::PARTIALLY_DOWNLOADED; @@ -312,6 +312,8 @@ void FileSegment::completeImpl(std::lock_guard & /* segment_lock */) download_state = State::SKIP_CACHE; LOG_TEST(log, "Remove cell {} (downloaded: {})", range().toString(), downloaded_size); cache->remove(key(), offset(), cache_lock); + + detached = true; } else if (is_last_holder) { @@ -323,6 +325,8 @@ void FileSegment::completeImpl(std::lock_guard & /* segment_lock */) */ LOG_TEST(log, "Resize cell {} to downloaded: {}", range().toString(), downloaded_size); cache->reduceSizeToDownloaded(key(), offset(), cache_lock); + + detached = true; } } } diff --git a/src/Common/FileSegment.h b/src/Common/FileSegment.h index d2807aead9c..4b77fd04acc 100644 --- a/src/Common/FileSegment.h +++ b/src/Common/FileSegment.h @@ -141,6 +141,8 @@ private: FileCache * cache; Poco::Logger * log; + + bool detached = false; }; struct FileSegmentsHolder : boost::noncopyable diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp index ef4a0885901..67e838a1d50 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.cpp @@ -80,18 +80,16 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getRemoteFSReadBuffer([[mayb * Implementation buffer from segment1 is passed to segment2 once segment1 is loaded. */ - // auto remote_fs_segment_reader = file_segment->getRemoteFileReader(); + auto remote_fs_segment_reader = file_segment->getRemoteFileReader(); - // if (remote_fs_segment_reader) - // return remote_fs_segment_reader; + if (remote_fs_segment_reader) + return remote_fs_segment_reader; - // remote_fs_segment_reader = remote_file_reader_creator(); - // file_segment->setRemoteFileReader(remote_fs_segment_reader); + remote_fs_segment_reader = remote_file_reader_creator(); + file_segment->setRemoteFileReader(remote_fs_segment_reader); - // return remote_fs_segment_reader; - - remote_file_reader = remote_file_reader_creator(); - return remote_file_reader; + ///TODO: add check for pending data + return remote_fs_segment_reader; } case ReadType::REMOTE_FS_READ_BYPASS_CACHE: { @@ -190,19 +188,6 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment( break; } - case FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION: - { - /// If downloader failed before downloading anything, it is determined - /// whether continuation is possible. In case of no continuation and - /// downloaded_size == 0 - cache cell is removed and state is switched to SKIP_CACHE. - - // assert(file_segment->downloadOffset() > 0); - - read_type = ReadType::CACHED; - implementation_buffer = getCacheReadBuffer(range.left); - - break; - } case FileSegment::State::PARTIALLY_DOWNLOADED: { size_t download_offset = file_segment->downloadOffset(); @@ -225,25 +210,6 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment( } LOG_TEST(log, "Current download offset: {}, file offset of buffer end: {}", download_offset, file_offset_of_buffer_end); - if (download_offset + 1 < file_offset_of_buffer_end) - { - /// segment{1} - /// cache: [_____|___________ - /// ^ - /// download_offset - /// requested_range: [__________] - /// ^ - /// file_offset_of_buffer_end - - /// TODO: optimize this with predownloading - read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; - implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type); - - // assert(first_segment_read_in_range); - // bytes_to_predownload = file_offset_of_buffer_end - file_segment->downloadOffset() - 1; - - break; - } auto downloader_id = file_segment->getOrSetDownloader(); if (downloader_id == file_segment->getCallerId()) @@ -251,12 +217,44 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment( read_type = ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE; implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type); + if (download_offset < file_offset_of_buffer_end) + { + /// segment{1} + /// cache: [_____|___________ + /// ^ + /// download_offset + /// requested_range: [__________] + /// ^ + /// file_offset_of_buffer_end + + assert(file_offset_of_buffer_end > file_segment->downloadOffset()); + bytes_to_predownload = file_offset_of_buffer_end - file_segment->downloadOffset(); + } + break; } download_state = file_segment->state(); continue; } + case FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION: + { + size_t download_offset = file_segment->downloadOffset(); + bool can_start_from_cache = download_offset > file_offset_of_buffer_end; + + if (can_start_from_cache) + { + read_type = ReadType::CACHED; + implementation_buffer = getCacheReadBuffer(range.left); + } + else + { + read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; + implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type); + } + + break; + } } break; @@ -273,6 +271,9 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment( /// TODO: For remote FS read need to set maximum possible right offset -- of /// the last empty segment and in s3 buffer check > instead of !=. + + // auto last_non_downloaded_offset = getLastNonDownloadedOffset(); + // implementation_buffer->setReadUntilPosition(last_non_downloaded_offset ? *last_non_downloaded_offset : range.right + 1); /// [..., range.right] implementation_buffer->setReadUntilPosition(range.right + 1); /// [..., range.right] switch (read_type) @@ -305,17 +306,9 @@ SeekableReadBufferPtr CachedReadBufferFromRemoteFS::getReadBufferForFileSegment( { size_t download_offset = file_segment->downloadOffset(); implementation_buffer->seek(download_offset, SEEK_SET); - - LOG_TEST(log, "Impl buffer seek to download offset: {}", download_offset); } else { - LOG_TEST(log, "Impl buffer seek to : {}, buffer reading from {} to {}, file segment info: {}", - file_offset_of_buffer_end, - implementation_buffer->getRemainingReadRange().left, - *implementation_buffer->getRemainingReadRange().right, - file_segment->getInfoForLog()); - implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET); } @@ -575,6 +568,7 @@ bool CachedReadBufferFromRemoteFS::nextImpl() } else { + download_current_segment = false; file_segment->complete(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); LOG_DEBUG(log, "No space left in cache, will continue without cache download"); } @@ -600,10 +594,10 @@ bool CachedReadBufferFromRemoteFS::nextImpl() } } - size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1; - + /// Local filesystem does not support bounded reads. if (read_type == ReadType::CACHED && std::next(current_file_segment_it) == file_segments_holder->file_segments.end()) { + size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1; size = std::min(size, remaining_size_to_read); impl->buffer().resize(size); } @@ -675,6 +669,22 @@ off_t CachedReadBufferFromRemoteFS::getPosition() return file_offset_of_buffer_end - available(); } +std::optional CachedReadBufferFromRemoteFS::getLastNonDownloadedOffset() const +{ + if (!file_segments_holder) + throw Exception(ErrorCodes::LOGICAL_ERROR, "File segments holder not initialized"); + + const auto & file_segments = file_segments_holder->file_segments; + for (auto it = file_segments.rbegin(); it != file_segments.rend(); ++it) + { + const auto & file_segment = *it; + if (file_segment->state() != FileSegment::State::DOWNLOADED) + return file_segment->range().right; + } + + return std::nullopt; +} + String CachedReadBufferFromRemoteFS::getInfoForLog() { return fmt::format("Buffer path: {}, hash key: {}, file_offset_of_buffer_end: {}, internal buffer remaining read range: {}, file segment info: {}", diff --git a/src/Disks/IO/CachedReadBufferFromRemoteFS.h b/src/Disks/IO/CachedReadBufferFromRemoteFS.h index 02e35fd8fe4..068c7ecd676 100644 --- a/src/Disks/IO/CachedReadBufferFromRemoteFS.h +++ b/src/Disks/IO/CachedReadBufferFromRemoteFS.h @@ -38,6 +38,8 @@ private: SeekableReadBufferPtr getCacheReadBuffer(size_t offset) const; + std::optional getLastNonDownloadedOffset() const; + enum class ReadType { CACHED,