From c2c35278ba38971c5674767f1bf260ef80903aea Mon Sep 17 00:00:00 2001 From: kssenii Date: Tue, 6 Sep 2022 13:30:02 +0200 Subject: [PATCH 1/4] Refactor cache for async download --- .../IO/CachedOnDiskReadBufferFromFile.cpp | 207 ++++--- src/Disks/IO/CachedOnDiskReadBufferFromFile.h | 4 +- src/Interpreters/Cache/FileCache.cpp | 150 +++-- src/Interpreters/Cache/FileCache.h | 44 +- src/Interpreters/Cache/FileCache_fwd.h | 1 + src/Interpreters/Cache/FileSegment.cpp | 546 ++++++++++-------- src/Interpreters/Cache/FileSegment.h | 200 ++++--- .../tests/gtest_lru_file_cache.cpp | 57 +- 8 files changed, 678 insertions(+), 531 deletions(-) diff --git a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp index 21ed465e205..4306cf7ae4d 100644 --- a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp @@ -118,7 +118,11 @@ void CachedOnDiskReadBufferFromFile::initialize(size_t offset, size_t size) } else { - file_segments_holder.emplace(cache->getOrSet(cache_key, offset, size, is_persistent)); + CreateFileSegmentSettings create_settings{ + .is_persistent = is_persistent + }; + + file_segments_holder.emplace(cache->getOrSet(cache_key, offset, size, create_settings)); } /** @@ -156,7 +160,7 @@ CachedOnDiskReadBufferFromFile::getCacheReadBuffer(size_t offset) const } CachedOnDiskReadBufferFromFile::ImplementationBufferPtr -CachedOnDiskReadBufferFromFile::getRemoteFSReadBuffer(FileSegmentPtr & file_segment, ReadType read_type_) +CachedOnDiskReadBufferFromFile::getRemoteFSReadBuffer(FileSegment & file_segment, ReadType read_type_) { switch (read_type_) { @@ -178,7 +182,7 @@ CachedOnDiskReadBufferFromFile::getRemoteFSReadBuffer(FileSegmentPtr & file_segm * Implementation buffer from segment1 is passed to segment2 once segment1 is loaded. */ - auto remote_fs_segment_reader = file_segment->getRemoteFileReader(); + auto remote_fs_segment_reader = file_segment.getRemoteFileReader(); if (!remote_fs_segment_reader) { @@ -189,7 +193,7 @@ CachedOnDiskReadBufferFromFile::getRemoteFSReadBuffer(FileSegmentPtr & file_segm ErrorCodes::CANNOT_USE_CACHE, "Cache cannot be used with a ReadBuffer which does not support right bounded reads"); - file_segment->setRemoteFileReader(remote_fs_segment_reader); + file_segment.setRemoteFileReader(remote_fs_segment_reader); } return remote_fs_segment_reader; @@ -201,8 +205,8 @@ CachedOnDiskReadBufferFromFile::getRemoteFSReadBuffer(FileSegmentPtr & file_segm if (remote_file_reader && remote_file_reader->getFileOffsetOfBufferEnd() == file_offset_of_buffer_end) return remote_file_reader; - auto remote_fs_segment_reader = file_segment->extractRemoteFileReader(); - if (remote_fs_segment_reader) + auto remote_fs_segment_reader = file_segment.extractRemoteFileReader(); + if (remote_fs_segment_reader && file_offset_of_buffer_end == implementation_buffer->getFileOffsetOfBufferEnd()) remote_file_reader = remote_fs_segment_reader; else remote_file_reader = implementation_buffer_creator(); @@ -217,6 +221,19 @@ CachedOnDiskReadBufferFromFile::getRemoteFSReadBuffer(FileSegmentPtr & file_segm } } +bool CachedOnDiskReadBufferFromFile::canStartFromCache(size_t current_offset, const FileSegment & file_segment) +{ + /// segment{k} state: DOWNLOADING + /// cache: [______|___________ + /// ^ + /// first_non_downloaded_offset (in progress) + /// requested_range: [__________] + /// ^ + /// current_offset + size_t first_non_downloaded_offset = file_segment.getFirstNonDownloadedOffset(); + return first_non_downloaded_offset > current_offset; +} + CachedOnDiskReadBufferFromFile::ImplementationBufferPtr CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & file_segment) { @@ -236,7 +253,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil { LOG_DEBUG(log, "Bypassing cache because `read_from_filesystem_cache_if_exists_otherwise_bypass_cache` option is used"); read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; - return getRemoteFSReadBuffer(file_segment, read_type); + return getRemoteFSReadBuffer(*file_segment, read_type); } } @@ -248,21 +265,16 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil { LOG_DEBUG(log, "Bypassing cache because file segment state is `SKIP_CACHE`"); read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; - return getRemoteFSReadBuffer(file_segment, read_type); + return getRemoteFSReadBuffer(*file_segment, read_type); } case FileSegment::State::DOWNLOADING: { - size_t download_offset = file_segment->getDownloadOffset(); - bool can_start_from_cache = download_offset > file_offset_of_buffer_end; - - /// If file segment is being downloaded but we can already read - /// from already downloaded part, do that. - if (can_start_from_cache) + if (canStartFromCache(file_offset_of_buffer_end, *file_segment)) { /// segment{k} state: DOWNLOADING /// cache: [______|___________ /// ^ - /// download_offset (in progress) + /// first_non_downloaded_offset (in progress) /// requested_range: [__________] /// ^ /// file_offset_of_buffer_end @@ -282,12 +294,12 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil case FileSegment::State::EMPTY: case FileSegment::State::PARTIALLY_DOWNLOADED: { - if (file_segment->getDownloadOffset() > file_offset_of_buffer_end) + if (file_segment->getFirstNonDownloadedOffset() > file_offset_of_buffer_end) { /// segment{k} state: PARTIALLY_DOWNLOADED /// cache: [______|___________ /// ^ - /// download_offset (in progress) + /// first_non_downloaded_offset (in progress) /// requested_range: [__________] /// ^ /// file_offset_of_buffer_end @@ -299,20 +311,12 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil auto downloader_id = file_segment->getOrSetDownloader(); if (downloader_id == file_segment->getCallerId()) { - size_t download_offset = file_segment->getDownloadOffset(); - bool can_start_from_cache = download_offset > file_offset_of_buffer_end; - - LOG_TEST( - log, - "Current download offset: {}, file offset of buffer end: {}", - download_offset, file_offset_of_buffer_end); - - if (can_start_from_cache) + if (canStartFromCache(file_offset_of_buffer_end, *file_segment)) { /// segment{k} /// cache: [______|___________ /// ^ - /// download_offset + /// first_non_downloaded_offset /// requested_range: [__________] /// ^ /// file_offset_of_buffer_end @@ -322,27 +326,24 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil return getCacheReadBuffer(range.left); } - if (download_offset < file_offset_of_buffer_end) + if (file_segment->getCurrentWriteOffset() < file_offset_of_buffer_end) { /// segment{1} /// cache: [_____|___________ /// ^ - /// download_offset + /// current_write_offset /// requested_range: [__________] /// ^ /// file_offset_of_buffer_end - assert(file_offset_of_buffer_end > file_segment->getDownloadOffset()); - bytes_to_predownload = file_offset_of_buffer_end - file_segment->getDownloadOffset(); - assert(bytes_to_predownload < range.size()); + LOG_TEST(log, "Predownload. File segment info: {}", file_segment->getInfoForLog()); + chassert(file_offset_of_buffer_end > file_segment->getCurrentWriteOffset()); + bytes_to_predownload = file_offset_of_buffer_end - file_segment->getCurrentWriteOffset(); + chassert(bytes_to_predownload < range.size()); } - download_offset = file_segment->getDownloadOffset(); - can_start_from_cache = download_offset > file_offset_of_buffer_end; - assert(!can_start_from_cache); - read_type = ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE; - return getRemoteFSReadBuffer(file_segment, read_type); + return getRemoteFSReadBuffer(*file_segment, read_type); } download_state = file_segment->state(); @@ -350,10 +351,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil } case FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION: { - size_t download_offset = file_segment->getDownloadOffset(); - bool can_start_from_cache = download_offset > file_offset_of_buffer_end; - - if (can_start_from_cache) + if (canStartFromCache(file_offset_of_buffer_end, *file_segment)) { read_type = ReadType::CACHED; return getCacheReadBuffer(range.left); @@ -364,7 +362,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil log, "Bypassing cache because file segment state is `PARTIALLY_DOWNLOADED_NO_CONTINUATION` and downloaded part already used"); read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; - return getRemoteFSReadBuffer(file_segment, read_type); + return getRemoteFSReadBuffer(*file_segment, read_type); } } } @@ -374,8 +372,8 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil CachedOnDiskReadBufferFromFile::ImplementationBufferPtr CachedOnDiskReadBufferFromFile::getImplementationBuffer(FileSegmentPtr & file_segment) { - assert(!file_segment->isDownloader()); - assert(file_offset_of_buffer_end >= file_segment->range().left); + chassert(!file_segment->isDownloader()); + chassert(file_offset_of_buffer_end >= file_segment->range().left); auto range = file_segment->range(); bytes_to_predownload = 0; @@ -389,10 +387,10 @@ CachedOnDiskReadBufferFromFile::getImplementationBuffer(FileSegmentPtr & file_se ProfileEvents::FileSegmentWaitReadBufferMicroseconds, watch.elapsedMicroseconds()); [[maybe_unused]] auto download_current_segment = read_type == ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE; - assert(download_current_segment == file_segment->isDownloader()); + chassert(download_current_segment == file_segment->isDownloader()); - assert(file_segment->range() == range); - assert(file_offset_of_buffer_end >= range.left && file_offset_of_buffer_end <= range.right); + chassert(file_segment->range() == range); + chassert(file_offset_of_buffer_end >= range.left && file_offset_of_buffer_end <= range.right); LOG_TEST( log, @@ -441,12 +439,12 @@ CachedOnDiskReadBufferFromFile::getImplementationBuffer(FileSegmentPtr & file_se } case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE: { - assert(file_segment->isDownloader()); + chassert(file_segment->isDownloader()); if (bytes_to_predownload) { - size_t download_offset = file_segment->getDownloadOffset(); - read_buffer_for_file_segment->seek(download_offset, SEEK_SET); + size_t current_write_offset = file_segment->getCurrentWriteOffset(); + read_buffer_for_file_segment->seek(current_write_offset, SEEK_SET); } else { @@ -456,18 +454,15 @@ CachedOnDiskReadBufferFromFile::getImplementationBuffer(FileSegmentPtr & file_se assert(static_cast(read_buffer_for_file_segment->getFileOffsetOfBufferEnd()) == file_offset_of_buffer_end); } - auto download_offset = file_segment->getDownloadOffset(); - if (download_offset != static_cast(read_buffer_for_file_segment->getPosition())) + auto current_write_offset = file_segment->getCurrentWriteOffset(); + if (current_write_offset != static_cast(read_buffer_for_file_segment->getPosition())) { throw Exception( ErrorCodes::LOGICAL_ERROR, - "Buffer's offsets mismatch; cached buffer offset: {}, download_offset: {}, " - "position: {}, implementation buffer remaining read range: {}, file segment info: {}", - file_offset_of_buffer_end, - download_offset, - read_buffer_for_file_segment->getPosition(), - read_buffer_for_file_segment->getRemainingReadRange().toString(), - file_segment->getInfoForLog()); + "Buffer's offsets mismatch. Cached buffer offset: {}, current_write_offset: {} implementation buffer offset: {}, " + "implementation buffer remaining range: {}, file segment info: {}", + file_offset_of_buffer_end, current_write_offset, read_buffer_for_file_segment->getPosition(), + read_buffer_for_file_segment->getRemainingReadRange().toString(), file_segment->getInfoForLog()); } break; @@ -488,7 +483,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext() auto & file_segment = *file_segment_it; [[maybe_unused]] const auto & range = file_segment->range(); - assert(file_offset_of_buffer_end > range.right); + chassert(file_offset_of_buffer_end > range.right); LOG_TEST( log, @@ -499,10 +494,8 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext() /// Do not hold pointer to file segment if it is not needed anymore /// so can become releasable and can be evicted from cache. - /// If the status of filesegment state is SKIP_CACHE, it will not be deleted. - /// It will be deleted from the cache when the holder is destructed. - if ((*file_segment_it)->state() != FileSegment::State::SKIP_CACHE) - file_segments_holder->file_segments.erase(file_segment_it); + file_segment->completeWithoutState(); + file_segments_holder->file_segments.erase(file_segment_it); if (current_file_segment_it == file_segments_holder->file_segments.end()) return false; @@ -545,8 +538,8 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegmentPtr & file_segment) /// download from offset a'' < a', but return buffer from offset a'. LOG_TEST(log, "Bytes to predownload: {}, caller_id: {}", bytes_to_predownload, FileSegment::getCallerId()); - assert(implementation_buffer->getFileOffsetOfBufferEnd() == file_segment->getDownloadOffset()); - size_t current_offset = file_segment->getDownloadOffset(); + chassert(implementation_buffer->getFileOffsetOfBufferEnd() == file_segment->getCurrentWriteOffset()); + size_t current_offset = file_segment->getCurrentWriteOffset(); const auto & current_range = file_segment->range(); while (true) @@ -572,7 +565,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegmentPtr & file_segment) "current download offset: {}, expected: {}, eof: {}", bytes_to_predownload, current_range.toString(), - file_segment->getDownloadOffset(), + file_segment->getCurrentWriteOffset(), file_offset_of_buffer_end, implementation_buffer->eof()); @@ -582,18 +575,20 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegmentPtr & file_segment) { nextimpl_working_buffer_offset = implementation_buffer->offset(); - auto download_offset = file_segment->getDownloadOffset(); - if (download_offset != static_cast(implementation_buffer->getPosition()) - || download_offset != file_offset_of_buffer_end) + auto current_write_offset = file_segment->getCurrentWriteOffset(); + if (current_write_offset != static_cast(implementation_buffer->getPosition()) + || current_write_offset != file_offset_of_buffer_end) + { throw Exception( ErrorCodes::LOGICAL_ERROR, "Buffer's offsets mismatch after predownloading; download offset: {}, " "cached buffer offset: {}, implementation buffer offset: {}, " "file segment info: {}", - download_offset, + current_write_offset, file_offset_of_buffer_end, implementation_buffer->getPosition(), file_segment->getInfoForLog()); + } } break; @@ -609,7 +604,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegmentPtr & file_segment) { LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, current_impl_buffer_size); - assert(file_segment->getDownloadOffset() == static_cast(implementation_buffer->getPosition())); + chassert(file_segment->getCurrentWriteOffset() == static_cast(implementation_buffer->getPosition())); bool success = writeCache(implementation_buffer->buffer().begin(), current_predownload_size, current_offset, *file_segment); if (success) @@ -635,7 +630,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegmentPtr & file_segment) /// segment{1} /// cache: [_____|___________ /// ^ - /// download_offset + /// current_write_offset /// requested_range: [__________] /// ^ /// file_offset_of_buffer_end @@ -649,17 +644,18 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegmentPtr & file_segment) bytes_to_predownload = 0; file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); - LOG_TEST(log, "Bypassing cache because space reservation failed"); + LOG_TEST(log, "Bypassing cache because for {}", file_segment->getInfoForLog()); + read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; swap(*implementation_buffer); resetWorkingBuffer(); - implementation_buffer = getRemoteFSReadBuffer(file_segment, read_type); + implementation_buffer = getRemoteFSReadBuffer(*file_segment, read_type); swap(*implementation_buffer); - implementation_buffer->setReadUntilPosition(current_range.right + 1); /// [..., range.right] + implementation_buffer->setReadUntilPosition(file_segment->range().right + 1); /// [..., range.right] implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET); LOG_TEST( @@ -680,8 +676,8 @@ bool CachedOnDiskReadBufferFromFile::updateImplementationBufferIfNeeded() auto current_read_range = file_segment->range(); auto current_state = file_segment->state(); - assert(current_read_range.left <= file_offset_of_buffer_end); - assert(!file_segment->isDownloader()); + chassert(current_read_range.left <= file_offset_of_buffer_end); + chassert(!file_segment->isDownloader()); if (file_offset_of_buffer_end > current_read_range.right) { @@ -695,13 +691,15 @@ bool CachedOnDiskReadBufferFromFile::updateImplementationBufferIfNeeded() /// segment{k} /// cache: [______|___________ /// ^ - /// download_offset + /// current_write_offset /// requested_range: [__________] /// ^ /// file_offset_of_buffer_end - size_t download_offset = file_segment->getDownloadOffset(); - bool cached_part_is_finished = download_offset == file_offset_of_buffer_end; + auto current_write_offset = file_segment->getCurrentWriteOffset(); + bool cached_part_is_finished = current_write_offset == file_offset_of_buffer_end; + + LOG_TEST(log, "Current write offset: {}, file offset of buffer end: {}", current_write_offset, file_offset_of_buffer_end); if (cached_part_is_finished) { @@ -710,12 +708,12 @@ bool CachedOnDiskReadBufferFromFile::updateImplementationBufferIfNeeded() return true; } - else if (download_offset < file_offset_of_buffer_end) + else if (current_write_offset < file_offset_of_buffer_end) { throw Exception( ErrorCodes::LOGICAL_ERROR, "Expected {} >= {} ({})", - download_offset, file_offset_of_buffer_end, getInfoForLog()); + current_write_offset, file_offset_of_buffer_end, getInfoForLog()); } } @@ -725,7 +723,7 @@ bool CachedOnDiskReadBufferFromFile::updateImplementationBufferIfNeeded() * ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE means that on previous getImplementationBuffer() call * current buffer successfully called file_segment->getOrSetDownloader() and became a downloader * for this file segment. However, the downloader's term has a lifespan of 1 nextImpl() call, - * e.g. downloader reads buffer_size byte and calls completeBatchAndResetDownloader() and some other + * e.g. downloader reads buffer_size byte and calls completePartAndResetDownloader() and some other * thread can become a downloader if it calls getOrSetDownloader() faster. * * So downloader is committed to download only buffer_size bytes and then is not a downloader anymore, @@ -817,11 +815,11 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep() if (need_complete_file_segment) { LOG_TEST(log, "Resetting downloader {} from scope exit", file_segment->getDownloader()); - file_segment->completeBatchAndResetDownloader(); + file_segment->completePartAndResetDownloader(); } } - assert(!file_segment->isDownloader()); + chassert(!file_segment->isDownloader()); } catch (...) { @@ -845,7 +843,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep() (*current_file_segment_it)->incrementHitsCount(); } - assert(!internal_buffer.empty()); + chassert(!internal_buffer.empty()); swap(*implementation_buffer); @@ -854,15 +852,14 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep() LOG_TEST( log, - "Current segment: {}, downloader: {}, current count: {}, position: {}, read range: {}", - current_read_range.toString(), - file_segment->getDownloader(), + "Current count: {}, position: {}, read range: {}, file segment: {}", implementation_buffer->count(), implementation_buffer->getPosition(), - implementation_buffer->getRemainingReadRange().toString()); + implementation_buffer->getRemainingReadRange().toString(), + file_segment->getInfoForLog()); - assert(current_read_range.left <= file_offset_of_buffer_end); - assert(current_read_range.right >= file_offset_of_buffer_end); + chassert(current_read_range.left <= file_offset_of_buffer_end); + chassert(current_read_range.right >= file_offset_of_buffer_end); bool result = false; size_t size = 0; @@ -939,24 +936,26 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep() { if (download_current_segment) { - assert(file_offset_of_buffer_end + size - 1 <= file_segment->range().right); + chassert(file_offset_of_buffer_end + size - 1 <= file_segment->range().right); bool success = file_segment->reserve(size); if (success) { - assert(file_segment->getDownloadOffset() == static_cast(implementation_buffer->getPosition())); + chassert(file_segment->getCurrentWriteOffset() == static_cast(implementation_buffer->getPosition())); success = writeCache(implementation_buffer->position(), size, file_offset_of_buffer_end, *file_segment); if (success) { - assert(file_segment->getDownloadOffset() <= file_segment->range().right + 1); - assert( + chassert(file_segment->getCurrentWriteOffset() <= file_segment->range().right + 1); + chassert( std::next(current_file_segment_it) == file_segments_holder->file_segments.end() - || file_segment->getDownloadOffset() == implementation_buffer->getFileOffsetOfBufferEnd()); + || file_segment->getCurrentWriteOffset() == implementation_buffer->getFileOffsetOfBufferEnd()); + + LOG_TEST(log, "Successfully written {} bytes", size); } else { - assert(file_segment->state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); + chassert(file_segment->state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); LOG_TEST(log, "Bypassing cache because writeCache method failed"); } } @@ -984,7 +983,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep() size_t remaining_size_to_read = std::min(current_read_range.right, read_until_position - 1) - file_offset_of_buffer_end + 1; size = std::min(size, remaining_size_to_read); - assert(implementation_buffer->buffer().size() >= nextimpl_working_buffer_offset + size); + chassert(implementation_buffer->buffer().size() >= nextimpl_working_buffer_offset + size); implementation_buffer->buffer().resize(nextimpl_working_buffer_offset + size); } @@ -996,15 +995,15 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep() current_file_segment_counters.increment(ProfileEvents::FileSegmentUsedBytes, available()); if (download_current_segment) - file_segment->completeBatchAndResetDownloader(); + file_segment->completePartAndResetDownloader(); - assert(!file_segment->isDownloader()); + chassert(!file_segment->isDownloader()); LOG_TEST( log, "Key: {}. Returning with {} bytes, buffer position: {} (offset: {}, predownloaded: {}), " "buffer available: {}, current range: {}, current offset: {}, file segment state: {}, " - "download offset: {}, read_type: {}, reading until position: {}, started with offset: {}, " + "current write offset: {}, read_type: {}, reading until position: {}, started with offset: {}, " "remaining ranges: {}", getHexUIntLowercase(cache_key), working_buffer.size(), @@ -1015,7 +1014,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep() current_read_range.toString(), file_offset_of_buffer_end, FileSegment::stateToString(file_segment->state()), - file_segment->getDownloadOffset(), + file_segment->getCurrentWriteOffset(), toString(read_type), read_until_position, first_offset, diff --git a/src/Disks/IO/CachedOnDiskReadBufferFromFile.h b/src/Disks/IO/CachedOnDiskReadBufferFromFile.h index ed623272c12..b86e53ec160 100644 --- a/src/Disks/IO/CachedOnDiskReadBufferFromFile.h +++ b/src/Disks/IO/CachedOnDiskReadBufferFromFile.h @@ -80,7 +80,7 @@ private: void assertCorrectness() const; - std::shared_ptr getRemoteFSReadBuffer(FileSegmentPtr & file_segment, ReadType read_type_); + std::shared_ptr getRemoteFSReadBuffer(FileSegment & file_segment, ReadType read_type_); size_t getTotalSizeToRead(); @@ -90,6 +90,8 @@ private: bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment); + static bool canStartFromCache(size_t current_offset, const FileSegment & file_segment); + Poco::Logger * log; FileCache::Key cache_key; String source_file_path; diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index f51df9ae737..20a9f6cce1d 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -122,7 +122,6 @@ void FileCache::initialize() fs::create_directories(cache_base_path); } - status_file = make_unique(fs::path(cache_base_path) / "status", StatusFile::write_full_info); is_initialized = true; } } @@ -258,7 +257,7 @@ FileSegments FileCache::splitRangeIntoCells( size_t offset, size_t size, FileSegment::State state, - bool is_persistent, + const CreateFileSegmentSettings & settings, std::lock_guard & cache_lock) { assert(size > 0); @@ -275,7 +274,7 @@ FileSegments FileCache::splitRangeIntoCells( current_cell_size = std::min(remaining_size, max_file_segment_size); remaining_size -= current_cell_size; - auto * cell = addCell(key, current_pos, current_cell_size, state, is_persistent, cache_lock); + auto * cell = addCell(key, current_pos, current_cell_size, state, settings, cache_lock); if (cell) file_segments.push_back(cell->file_segment); assert(cell); @@ -292,7 +291,7 @@ void FileCache::fillHolesWithEmptyFileSegments( const Key & key, const FileSegment::Range & range, bool fill_with_detached_file_segments, - bool is_persistent, + const CreateFileSegmentSettings & settings, std::lock_guard & cache_lock) { /// There are segments [segment1, ..., segmentN] @@ -339,16 +338,16 @@ void FileCache::fillHolesWithEmptyFileSegments( if (fill_with_detached_file_segments) { - auto file_segment = std::make_shared(current_pos, hole_size, key, this, FileSegment::State::EMPTY); + auto file_segment = std::make_shared(current_pos, hole_size, key, this, FileSegment::State::EMPTY, settings); { - std::lock_guard segment_lock(file_segment->mutex); - file_segment->markAsDetached(segment_lock); + std::unique_lock segment_lock(file_segment->mutex); + file_segment->detachAssumeStateFinalized(segment_lock); } file_segments.insert(it, file_segment); } else { - file_segments.splice(it, splitRangeIntoCells(key, current_pos, hole_size, FileSegment::State::EMPTY, is_persistent, cache_lock)); + file_segments.splice(it, splitRangeIntoCells(key, current_pos, hole_size, FileSegment::State::EMPTY, settings, cache_lock)); } current_pos = segment_range.right + 1; @@ -366,22 +365,23 @@ void FileCache::fillHolesWithEmptyFileSegments( if (fill_with_detached_file_segments) { - auto file_segment = std::make_shared(current_pos, hole_size, key, this, FileSegment::State::EMPTY); + auto file_segment = std::make_shared(current_pos, hole_size, key, this, FileSegment::State::EMPTY, settings); { - std::lock_guard segment_lock(file_segment->mutex); - file_segment->markAsDetached(segment_lock); + std::unique_lock segment_lock(file_segment->mutex); + file_segment->detachAssumeStateFinalized(segment_lock); } file_segments.insert(file_segments.end(), file_segment); } else { file_segments.splice( - file_segments.end(), splitRangeIntoCells(key, current_pos, hole_size, FileSegment::State::EMPTY, is_persistent, cache_lock)); + file_segments.end(), + splitRangeIntoCells(key, current_pos, hole_size, FileSegment::State::EMPTY, settings, cache_lock)); } } } -FileSegmentsHolder FileCache::getOrSet(const Key & key, size_t offset, size_t size, bool is_persistent) +FileSegmentsHolder FileCache::getOrSet(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings) { std::lock_guard cache_lock(mutex); @@ -398,11 +398,11 @@ FileSegmentsHolder FileCache::getOrSet(const Key & key, size_t offset, size_t si if (file_segments.empty()) { - file_segments = splitRangeIntoCells(key, offset, size, FileSegment::State::EMPTY, is_persistent, cache_lock); + file_segments = splitRangeIntoCells(key, offset, size, FileSegment::State::EMPTY, settings, cache_lock); } else { - fillHolesWithEmptyFileSegments(file_segments, key, range, /* fill_with_detached */false, is_persistent, cache_lock); + fillHolesWithEmptyFileSegments(file_segments, key, range, /* fill_with_detached */false, settings, cache_lock); } assert(!file_segments.empty()); @@ -426,16 +426,17 @@ FileSegmentsHolder FileCache::get(const Key & key, size_t offset, size_t size) if (file_segments.empty()) { - auto file_segment = std::make_shared(offset, size, key, this, FileSegment::State::EMPTY); + auto file_segment = std::make_shared( + offset, size, key, this, FileSegment::State::EMPTY, CreateFileSegmentSettings{}); { - std::lock_guard segment_lock(file_segment->mutex); - file_segment->markAsDetached(segment_lock); + std::unique_lock segment_lock(file_segment->mutex); + file_segment->detachAssumeStateFinalized(segment_lock); } file_segments = { file_segment }; } else { - fillHolesWithEmptyFileSegments(file_segments, key, range, /* fill_with_detached */true, /* is_persistent */false, cache_lock); + fillHolesWithEmptyFileSegments(file_segments, key, range, /* fill_with_detached */true, {}, cache_lock); } return FileSegmentsHolder(std::move(file_segments)); @@ -443,7 +444,7 @@ FileSegmentsHolder FileCache::get(const Key & key, size_t offset, size_t size) FileCache::FileSegmentCell * FileCache::addCell( const Key & key, size_t offset, size_t size, - FileSegment::State state, bool is_persistent, + FileSegment::State state, const CreateFileSegmentSettings & settings, std::lock_guard & cache_lock) { /// Create a file segment cell and put it in `files` map by [key][offset]. @@ -475,18 +476,23 @@ FileCache::FileSegmentCell * FileCache::addCell( stash_records.erase({remove_priority_iter->key(), remove_priority_iter->offset()}); remove_priority_iter->removeAndGetNext(cache_lock); } - /// For segments that do not reach the download threshold, we do not download them, but directly read them + + /// For segments that do not reach the download threshold, + /// we do not download them, but directly read them result_state = FileSegment::State::SKIP_CACHE; } else { auto priority_iter = record->second; priority_iter->use(cache_lock); - result_state = priority_iter->hits() >= enable_cache_hits_threshold ? FileSegment::State::EMPTY : FileSegment::State::SKIP_CACHE; + + result_state = priority_iter->hits() >= enable_cache_hits_threshold + ? FileSegment::State::EMPTY + : FileSegment::State::SKIP_CACHE; } } - return std::make_shared(offset, size, key, this, result_state, is_persistent); + return std::make_shared(offset, size, key, this, result_state, settings); }; FileSegmentCell cell(skip_or_download(), this, cache_lock); @@ -495,6 +501,7 @@ FileCache::FileSegmentCell * FileCache::addCell( if (offsets.empty()) { auto key_path = getPathInLocalCache(key); + if (!fs::exists(key_path)) fs::create_directories(key_path); } @@ -513,7 +520,7 @@ FileSegmentPtr FileCache::createFileSegmentForDownload( const Key & key, size_t offset, size_t size, - bool is_persistent, + const CreateFileSegmentSettings & settings, std::lock_guard & cache_lock) { #ifndef NDEBUG @@ -530,7 +537,7 @@ FileSegmentPtr FileCache::createFileSegmentForDownload( "Cache cell already exists for key `{}` and offset {}", key.toString(), offset); - cell = addCell(key, offset, size, FileSegment::State::EMPTY, is_persistent, cache_lock); + cell = addCell(key, offset, size, FileSegment::State::EMPTY, settings, cache_lock); if (!cell) throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to add a new cell for download"); @@ -542,18 +549,21 @@ bool FileCache::tryReserve(const Key & key, size_t offset, size_t size, std::loc { auto query_context = enable_filesystem_query_cache_limit ? getCurrentQueryContext(cache_lock) : nullptr; if (!query_context) + { return tryReserveForMainList(key, offset, size, nullptr, cache_lock); - + } /// The maximum cache capacity of the request is not reached, thus the //// cache block is evicted from the main LRU queue by tryReserveForMainList(). else if (query_context->getCacheSize() + size <= query_context->getMaxCacheSize()) + { return tryReserveForMainList(key, offset, size, query_context, cache_lock); - + } /// When skip_download_if_exceeds_query_cache is true, there is no need /// to evict old data, skip the cache and read directly from remote fs. else if (query_context->isSkipDownloadIfExceed()) + { return false; - + } /// The maximum cache size of the query is reached, the cache will be /// evicted from the history cache accessed by the current query. else @@ -833,7 +843,7 @@ void FileCache::removeIfExists(const Key & key) auto file_segment = cell->file_segment; if (file_segment) { - std::lock_guard segment_lock(file_segment->mutex); + std::unique_lock segment_lock(file_segment->mutex); file_segment->detach(cache_lock, segment_lock); remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock); } @@ -863,9 +873,11 @@ void FileCache::removeIfReleasable() auto * cell = getCell(key, offset, cache_lock); if (!cell) + { throw Exception( ErrorCodes::LOGICAL_ERROR, "Cache is in inconsistent state: LRU queue contains entries with no cache cell"); + } if (cell->releasable()) { @@ -880,7 +892,7 @@ void FileCache::removeIfReleasable() for (auto & file_segment : to_remove) { - std::lock_guard segment_lock(file_segment->mutex); + std::unique_lock segment_lock(file_segment->mutex); file_segment->detach(cache_lock, segment_lock); remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock); } @@ -896,13 +908,13 @@ void FileCache::removeIfReleasable() void FileCache::remove(FileSegmentPtr file_segment, std::lock_guard & cache_lock) { - std::lock_guard segment_lock(file_segment->mutex); + std::unique_lock segment_lock(file_segment->mutex); remove(file_segment->key(), file_segment->offset(), cache_lock, segment_lock); } void FileCache::remove( Key key, size_t offset, - std::lock_guard & cache_lock, std::lock_guard & /* segment_lock */) + std::lock_guard & cache_lock, std::unique_lock & /* segment_lock */) { LOG_DEBUG(log, "Remove from cache. Key: {}, offset: {}", key.toString(), offset); @@ -976,7 +988,7 @@ void FileCache::loadCacheInfoIntoMemory(std::lock_guard & cache_lock { if (!key_it->is_directory()) { - LOG_DEBUG(log, "Unexpected file {} (not a directory), will skip it", key_it->path().string()); + LOG_DEBUG(log, "Unexpected file: {}. Expected a directory", key_it->path().string()); continue; } @@ -1012,7 +1024,10 @@ void FileCache::loadCacheInfoIntoMemory(std::lock_guard & cache_lock if (tryReserve(key, offset, size, cache_lock)) { - auto * cell = addCell(key, offset, size, FileSegment::State::DOWNLOADED, is_persistent, cache_lock); + auto * cell = addCell( + key, offset, size, FileSegment::State::DOWNLOADED, + CreateFileSegmentSettings{ .is_persistent = is_persistent }, cache_lock); + if (cell) queue_entries.emplace_back(cell->queue_iterator, cell->file_segment); } @@ -1049,7 +1064,7 @@ void FileCache::loadCacheInfoIntoMemory(std::lock_guard & cache_lock void FileCache::reduceSizeToDownloaded( const Key & key, size_t offset, - std::lock_guard & cache_lock, std::lock_guard & /* segment_lock */) + std::lock_guard & cache_lock, std::unique_lock & segment_lock) { /** * In case file was partially downloaded and it's download cannot be continued @@ -1069,20 +1084,25 @@ void FileCache::reduceSizeToDownloaded( const auto & file_segment = cell->file_segment; size_t downloaded_size = file_segment->downloaded_size; - if (downloaded_size == file_segment->range().size()) + size_t full_size = file_segment->range().size(); + + if (downloaded_size == full_size) { throw Exception( ErrorCodes::LOGICAL_ERROR, - "Nothing to reduce, file segment fully downloaded, key: {}, offset: {}", - key.toString(), offset); + "Nothing to reduce, file segment fully downloaded: {}", + file_segment->getInfoForLogUnlocked(segment_lock)); } - cell->file_segment = std::make_shared(offset, downloaded_size, key, this, FileSegment::State::DOWNLOADED); + cell->file_segment = std::make_shared( + offset, downloaded_size, key, this, FileSegment::State::DOWNLOADED, CreateFileSegmentSettings{}); + + assert(file_segment->reserved_size == downloaded_size); } bool FileCache::isLastFileSegmentHolder( const Key & key, size_t offset, - std::lock_guard & cache_lock, std::lock_guard & /* segment_lock */) + std::lock_guard & cache_lock, std::unique_lock & /* segment_lock */) { auto * cell = getCell(key, offset, cache_lock); @@ -1167,7 +1187,8 @@ FileCache::FileSegmentCell::FileSegmentCell( { case FileSegment::State::DOWNLOADED: { - queue_iterator = cache->main_priority->add(file_segment->key(), file_segment->offset(), file_segment->range().size(), cache_lock); + queue_iterator = cache->main_priority->add( + file_segment->key(), file_segment->offset(), file_segment->range().size(), cache_lock); break; } case FileSegment::State::SKIP_CACHE: @@ -1246,14 +1267,41 @@ void FileCache::assertPriorityCorrectness(std::lock_guard & cache_lo ErrorCodes::LOGICAL_ERROR, "Cache is in inconsistent state: LRU queue contains entries with no cache cell (assertCorrectness())"); } - assert(cell->size() == size); + + if (cell->size() != size) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected {} == {} size ({})", + cell->size(), size, cell->file_segment->getInfoForLog()); + } + total_size += size; } + assert(total_size == main_priority->getCacheSize(cache_lock)); assert(main_priority->getCacheSize(cache_lock) <= max_size); assert(main_priority->getElementsNum(cache_lock) <= max_element_size); } +FileCache::QueryContextHolder::QueryContextHolder( + const String & query_id_, + FileCache * cache_, + FileCache::QueryContextPtr context_) + : query_id(query_id_) + , cache(cache_) + , context(context_) +{ +} + +FileCache::QueryContextHolder::~QueryContextHolder() +{ + /// If only the query_map and the current holder hold the context_query, + /// the query has been completed and the query_context is released. + if (context && context.use_count() == 2) + cache->removeQueryContext(query_id); +} + FileCache::QueryContextPtr FileCache::getCurrentQueryContext(std::lock_guard & cache_lock) { if (!isQueryInitialized()) @@ -1362,22 +1410,4 @@ void FileCache::QueryContext::use(const Key & key, size_t offset, std::lock_guar record->second->use(cache_lock); } -FileCache::QueryContextHolder::QueryContextHolder( - const String & query_id_, - FileCache * cache_, - FileCache::QueryContextPtr context_) - : query_id(query_id_) - , cache(cache_) - , context(context_) -{ -} - -FileCache::QueryContextHolder::~QueryContextHolder() -{ - /// If only the query_map and the current holder hold the context_query, - /// the query has been completed and the query_context is released. - if (context && context.use_count() == 2) - cache->removeQueryContext(query_id); -} - } diff --git a/src/Interpreters/Cache/FileCache.h b/src/Interpreters/Cache/FileCache.h index 3f5a5c9e1c5..07aea230803 100644 --- a/src/Interpreters/Cache/FileCache.h +++ b/src/Interpreters/Cache/FileCache.h @@ -12,13 +12,14 @@ #include #include +#include +#include #include +#include +#include #include #include -#include -#include -#include -#include + namespace DB { @@ -43,7 +44,6 @@ public: ~FileCache() = default; - /// Restore cache from local filesystem. void initialize(); const String & getBasePath() const { return cache_base_path; } @@ -59,7 +59,7 @@ public: * As long as pointers to returned file segments are hold * it is guaranteed that these file segments are not removed from cache. */ - FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size, bool is_persistent); + FileSegmentsHolder getOrSet(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings); /** * Segments in returned list are ordered in ascending order and represent a full contiguous @@ -104,7 +104,7 @@ public: const Key & key, size_t offset, size_t size, - bool is_persistent, + const CreateFileSegmentSettings & create_settings, std::lock_guard & cache_lock); FileSegments getSnapshot() const; @@ -132,21 +132,21 @@ public: private: String cache_base_path; - size_t max_size; - size_t max_element_size; - size_t max_file_segment_size; + const size_t max_size; + const size_t max_element_size; + const size_t max_file_segment_size; - bool allow_persistent_files; - size_t enable_cache_hits_threshold; - bool enable_filesystem_query_cache_limit; + const bool allow_persistent_files; + const size_t enable_cache_hits_threshold; + const bool enable_filesystem_query_cache_limit; + mutable std::mutex mutex; Poco::Logger * log; bool is_initialized = false; std::exception_ptr initialization_exception; - std::unique_ptr status_file; - mutable std::mutex mutex; + void assertInitialized(std::lock_guard & cache_lock) const; bool tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock); @@ -154,7 +154,7 @@ private: Key key, size_t offset, std::lock_guard & cache_lock, - std::lock_guard & segment_lock); + std::unique_lock & segment_lock); void remove( FileSegmentPtr file_segment, @@ -164,15 +164,13 @@ private: const Key & key, size_t offset, std::lock_guard & cache_lock, - std::lock_guard & segment_lock); + std::unique_lock & segment_lock); void reduceSizeToDownloaded( const Key & key, size_t offset, std::lock_guard & cache_lock, - std::lock_guard & segment_lock); - - void assertInitialized(std::lock_guard & cache_lock) const; + std::unique_lock & segment_lock); struct FileSegmentCell : private boost::noncopyable { @@ -225,7 +223,7 @@ private: size_t offset, size_t size, FileSegment::State state, - bool is_persistent, + const CreateFileSegmentSettings & create_settings, std::lock_guard & cache_lock); static void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard & cache_lock); @@ -242,7 +240,7 @@ private: size_t offset, size_t size, FileSegment::State state, - bool is_persistent, + const CreateFileSegmentSettings & create_settings, std::lock_guard & cache_lock); String dumpStructureUnlocked(const Key & key_, std::lock_guard & cache_lock); @@ -252,7 +250,7 @@ private: const Key & key, const FileSegment::Range & range, bool fill_with_detached_file_segments, - bool is_persistent, + const CreateFileSegmentSettings & settings, std::lock_guard & cache_lock); size_t getUsedCacheSizeUnlocked(std::lock_guard & cache_lock) const; diff --git a/src/Interpreters/Cache/FileCache_fwd.h b/src/Interpreters/Cache/FileCache_fwd.h index 13f037783b0..25c16b4e840 100644 --- a/src/Interpreters/Cache/FileCache_fwd.h +++ b/src/Interpreters/Cache/FileCache_fwd.h @@ -12,5 +12,6 @@ class FileCache; using FileCachePtr = std::shared_ptr; struct FileCacheSettings; +struct CreateFileSegmentSettings; } diff --git a/src/Interpreters/Cache/FileSegment.cpp b/src/Interpreters/Cache/FileSegment.cpp index 547e6849dd6..8a67dfc1dc6 100644 --- a/src/Interpreters/Cache/FileSegment.cpp +++ b/src/Interpreters/Cache/FileSegment.cpp @@ -1,10 +1,10 @@ #include "FileSegment.h" #include -#include +#include +#include #include #include -#include #include #include #include @@ -29,7 +29,7 @@ FileSegment::FileSegment( const Key & key_, FileCache * cache_, State download_state_, - bool is_persistent_) + const CreateFileSegmentSettings & settings) : segment_range(offset_, offset_ + size_ - 1) , download_state(download_state_) , file_key(key_) @@ -39,7 +39,7 @@ FileSegment::FileSegment( #else , log(&Poco::Logger::get("FileSegment")) #endif - , is_persistent(is_persistent_) + , is_persistent(settings.is_persistent) { /// On creation, file segment state can be EMPTY, DOWNLOADED, DOWNLOADING. switch (download_state) @@ -64,50 +64,86 @@ FileSegment::FileSegment( } default: { - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING state"); + throw Exception( + ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, + "Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING state"); } } } +String FileSegment::getPathInLocalCache() const +{ + return cache->getPathInLocalCache(key(), offset(), isPersistent()); +} + FileSegment::State FileSegment::state() const { - std::lock_guard segment_lock(mutex); + std::unique_lock segment_lock(mutex); return download_state; } -size_t FileSegment::getDownloadOffset() const +void FileSegment::setDownloadState(State state) +{ + LOG_TEST(log, "Updated state from {} to {}", stateToString(download_state), stateToString(state)); + download_state = state; +} + +size_t FileSegment::getFirstNonDownloadedOffset() const +{ + std::unique_lock segment_lock(mutex); + return getFirstNonDownloadedOffsetUnlocked(segment_lock); +} + +size_t FileSegment::getFirstNonDownloadedOffsetUnlocked(std::unique_lock & segment_lock) const { - std::lock_guard segment_lock(mutex); return range().left + getDownloadedSizeUnlocked(segment_lock); } +size_t FileSegment::getCurrentWriteOffset() const +{ + std::unique_lock segment_lock(mutex); + return getCurrentWriteOffsetUnlocked(segment_lock); +} + +size_t FileSegment::getCurrentWriteOffsetUnlocked(std::unique_lock & segment_lock) const +{ + return getFirstNonDownloadedOffsetUnlocked(segment_lock); +} + size_t FileSegment::getDownloadedSize() const { - std::lock_guard segment_lock(mutex); + std::unique_lock segment_lock(mutex); return getDownloadedSizeUnlocked(segment_lock); } -size_t FileSegment::getRemainingSizeToDownload() const -{ - std::lock_guard segment_lock(mutex); - return range().size() - getDownloadedSizeUnlocked(segment_lock); -} - -bool FileSegment::isDetached() const -{ - std::lock_guard segment_lock(mutex); - return is_detached; -} - -size_t FileSegment::getDownloadedSizeUnlocked(std::lock_guard & /* segment_lock */) const +size_t FileSegment::getDownloadedSizeUnlocked(std::unique_lock & /* segment_lock */) const { if (download_state == State::DOWNLOADED) return downloaded_size; - std::lock_guard download_lock(download_mutex); + std::unique_lock download_lock(download_mutex); return downloaded_size; } +size_t FileSegment::getAvailableSizeUnlocked(std::unique_lock & segment_lock) const +{ + auto current_downloaded_size = getDownloadedSizeUnlocked(segment_lock); + chassert(reserved_size >= current_downloaded_size); + return reserved_size - current_downloaded_size; +} + +size_t FileSegment::getRemainingSizeToDownload() const +{ + std::unique_lock segment_lock(mutex); + return range().size() - getDownloadedSizeUnlocked(segment_lock); +} + +bool FileSegment::isDownloaded() const +{ + std::lock_guard segment_lock(mutex); + return is_downloaded; +} + String FileSegment::getCallerId() { if (!CurrentThread::isInitialized() @@ -118,84 +154,108 @@ String FileSegment::getCallerId() return std::string(CurrentThread::getQueryId()) + ":" + toString(getThreadId()); } +String FileSegment::getDownloader() const +{ + std::unique_lock segment_lock(mutex); + return getDownloaderUnlocked(segment_lock); +} + +String FileSegment::getDownloaderUnlocked(std::unique_lock & /* segment_lock */) const +{ + return downloader_id; +} + String FileSegment::getOrSetDownloader() { - std::lock_guard segment_lock(mutex); + std::unique_lock segment_lock(mutex); - assertNotDetached(segment_lock); + assertNotDetachedUnlocked(segment_lock); - if (downloader_id.empty()) + auto current_downloader = getDownloaderUnlocked(segment_lock); + + if (current_downloader.empty()) { - assert(download_state != State::DOWNLOADING); - - if (download_state != State::EMPTY - && download_state != State::PARTIALLY_DOWNLOADED) + bool allow_new_downloader = download_state == State::EMPTY || download_state == State::PARTIALLY_DOWNLOADED; + if (!allow_new_downloader) return "None"; - downloader_id = getCallerId(); - download_state = State::DOWNLOADING; - } - else if (downloader_id == getCallerId()) - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "Attempt to set the same downloader for segment {} for the second time", range().toString()); + chassert(download_state != State::DOWNLOADING); - return downloader_id; + current_downloader = downloader_id = getCallerId(); + setDownloadState(State::DOWNLOADING); + } + + return current_downloader; +} + +void FileSegment::resetDownloadingStateUnlocked([[maybe_unused]] std::unique_lock & segment_lock) +{ + assert(isDownloaderUnlocked(segment_lock)); + assert(download_state == State::DOWNLOADING); + + size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock); + /// range().size() can equal 0 in case of write-though cache. + if (current_downloaded_size != 0 && current_downloaded_size == range().size()) + setDownloadedUnlocked(segment_lock); + else + setDownloadState(State::PARTIALLY_DOWNLOADED); } void FileSegment::resetDownloader() { - std::lock_guard segment_lock(mutex); + std::unique_lock segment_lock(mutex); - assertNotDetached(segment_lock); + assertNotDetachedUnlocked(segment_lock); + assertIsDownloaderUnlocked("resetDownloader", segment_lock); - if (downloader_id.empty()) - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "There is no downloader"); - - if (getCallerId() != downloader_id) - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Downloader can be reset only by downloader"); - - resetDownloaderImpl(segment_lock); + resetDownloadingStateUnlocked(segment_lock); + resetDownloaderUnlocked(segment_lock); } -void FileSegment::resetDownloaderImpl(std::lock_guard & segment_lock) +void FileSegment::resetDownloaderUnlocked(std::unique_lock & /* segment_lock */) { - if (getDownloadedSizeUnlocked(segment_lock) == range().size()) - setDownloaded(segment_lock); - else - download_state = State::PARTIALLY_DOWNLOADED; - + LOG_TEST(log, "Resetting downloader from {}", downloader_id); downloader_id.clear(); } -String FileSegment::getDownloader() const +void FileSegment::assertIsDownloaderUnlocked(const std::string & operation, std::unique_lock & segment_lock) const { - std::lock_guard segment_lock(mutex); - return downloader_id; + auto caller = getCallerId(); + auto current_downloader = getDownloaderUnlocked(segment_lock); + LOG_TEST(log, "Downloader id: {}, caller id: {}", current_downloader, caller); + + if (caller != current_downloader) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Operation `{}` can be done only by downloader. " + "(CallerId: {}, downloader id: {})", + operation, caller, downloader_id); + } } bool FileSegment::isDownloader() const { - std::lock_guard segment_lock(mutex); - return getCallerId() == downloader_id; + std::unique_lock segment_lock(mutex); + return isDownloaderUnlocked(segment_lock); } -bool FileSegment::isDownloaderImpl(std::lock_guard & /* segment_lock */) const +bool FileSegment::isDownloaderUnlocked(std::unique_lock & segment_lock) const { - return getCallerId() == downloader_id; + return getCallerId() == getDownloaderUnlocked(segment_lock); } FileSegment::RemoteFileReaderPtr FileSegment::getRemoteFileReader() { - if (!isDownloader()) - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Only downloader can use remote filesystem file reader"); - + std::unique_lock segment_lock(mutex); + assertIsDownloaderUnlocked("getRemoteFileReader", segment_lock); return remote_file_reader; } FileSegment::RemoteFileReaderPtr FileSegment::extractRemoteFileReader() { std::lock_guard cache_lock(cache->mutex); - std::lock_guard segment_lock(mutex); + std::unique_lock segment_lock(mutex); if (!is_detached) { @@ -210,8 +270,8 @@ FileSegment::RemoteFileReaderPtr FileSegment::extractRemoteFileReader() void FileSegment::setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_) { - if (!isDownloader()) - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Only downloader can use remote filesystem file reader"); + std::unique_lock segment_lock(mutex); + assertIsDownloaderUnlocked("setRemoteFileReader", segment_lock); if (remote_file_reader) throw Exception(ErrorCodes::LOGICAL_ERROR, "Remote file reader already exists"); @@ -221,8 +281,8 @@ void FileSegment::setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_) void FileSegment::resetRemoteFileReader() { - if (!isDownloader()) - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Only downloader can use remote filesystem file reader"); + std::unique_lock segment_lock(mutex); + assertIsDownloaderUnlocked("resetRemoteFileReader", segment_lock); if (!remote_file_reader) throw Exception(ErrorCodes::LOGICAL_ERROR, "Remote file reader does not exist"); @@ -230,45 +290,47 @@ void FileSegment::resetRemoteFileReader() remote_file_reader.reset(); } -void FileSegment::write(const char * from, size_t size, size_t offset_) +void FileSegment::write(const char * from, size_t size, size_t offset) { if (!size) throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Writing zero size is not allowed"); - if (availableSize() < size) - throw Exception( - ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "Not enough space is reserved. Available: {}, expected: {}", availableSize(), size); - - if (!isDownloader()) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Only downloader can do the downloading. (CallerId: {}, DownloaderId: {})", - getCallerId(), downloader_id); - - if (getDownloadedSize() == range().size()) - throw Exception( - ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "Attempt to write {} bytes to offset: {}, but current file segment is already fully downloaded", - size, offset_); - - auto download_offset = range().left + downloaded_size; - if (offset_ != download_offset) - throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "Attempt to write {} bytes to offset: {}, but current download offset is {}", - size, offset_, download_offset); - { - std::lock_guard segment_lock(mutex); - assertNotDetached(segment_lock); + std::unique_lock segment_lock(mutex); + + assertIsDownloaderUnlocked("write", segment_lock); + assertNotDetachedUnlocked(segment_lock); + + if (download_state != State::DOWNLOADING) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected DOWNLOADING state, got {}", stateToString(download_state)); + + size_t first_non_downloaded_offset = getFirstNonDownloadedOffsetUnlocked(segment_lock); + if (offset != first_non_downloaded_offset) + throw Exception( + ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, + "Attempt to write {} bytes to offset: {}, but current write offset is {}", + size, offset, first_non_downloaded_offset); + + size_t available_size = getAvailableSizeUnlocked(segment_lock); + if (available_size < size) + throw Exception( + ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, + "Not enough space is reserved. Available: {}, expected: {}", available_size, size); + + if (getDownloadedSizeUnlocked(segment_lock) == range().size()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "File segment is already fully downloaded"); } if (!cache_writer) { - if (downloaded_size > 0) - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Cache writer was finalized (downloaded size: {}, state: {})", - downloaded_size, stateToString(download_state)); + auto current_downloaded_size = getDownloadedSize(); + if (current_downloaded_size > 0) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cache writer was finalized (downloaded size: {}, state: {})", + current_downloaded_size, stateToString(download_state)); auto download_path = getPathInLocalCache(); cache_writer = std::make_unique(download_path); @@ -278,7 +340,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset_) { cache_writer->write(from, size); - std::lock_guard download_lock(download_mutex); + std::unique_lock download_lock(download_mutex); cache_writer->next(); @@ -286,23 +348,21 @@ void FileSegment::write(const char * from, size_t size, size_t offset_) } catch (Exception & e) { - std::lock_guard segment_lock(mutex); + std::unique_lock segment_lock(mutex); wrapWithCacheInfo(e, "while writing into cache", segment_lock); - setDownloadFailed(segment_lock); + setDownloadFailedUnlocked(segment_lock); cv.notify_all(); throw; } - assert(getDownloadOffset() == offset_ + size); -} - -String FileSegment::getPathInLocalCache() const -{ - return cache->getPathInLocalCache(key(), offset(), isPersistent()); +#ifndef NDEBUG + std::unique_lock segment_lock(mutex); + chassert(getFirstNonDownloadedOffsetUnlocked(segment_lock) == offset + size); +#endif } FileSegment::State FileSegment::wait() @@ -324,8 +384,8 @@ FileSegment::State FileSegment::wait() { LOG_TEST(log, "{} waiting on: {}, current downloader: {}", getCallerId(), range().toString(), downloader_id); - assert(!downloader_id.empty()); - assert(downloader_id != getCallerId()); + chassert(!getDownloaderUnlocked(segment_lock).empty()); + chassert(!isDownloaderUnlocked(segment_lock)); cv.wait_for(segment_lock, std::chrono::seconds(60)); } @@ -338,30 +398,23 @@ bool FileSegment::reserve(size_t size_to_reserve) if (!size_to_reserve) throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Zero space reservation is not allowed"); + size_t expected_downloaded_size; + { - std::lock_guard segment_lock(mutex); - assertNotDetached(segment_lock); + std::unique_lock segment_lock(mutex); - auto caller_id = getCallerId(); - bool is_downloader = caller_id == downloader_id; - if (!is_downloader) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Space can be reserved only by downloader (current: {}, expected: {})", - caller_id, downloader_id); - } + assertNotDetachedUnlocked(segment_lock); + assertIsDownloaderUnlocked("reserve", segment_lock); - size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock); - if (current_downloaded_size + size_to_reserve > range().size()) - { + expected_downloaded_size = getDownloadedSizeUnlocked(segment_lock); + + if (expected_downloaded_size + size_to_reserve > range().size()) throw Exception( ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "Attempt to reserve space too much space: {} ({})", - size_to_reserve, getInfoForLogImpl(segment_lock)); - } + "Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {})", + size_to_reserve, range().toString(), downloaded_size); - assert(reserved_size >= current_downloaded_size); + chassert(reserved_size >= expected_downloaded_size); } /** @@ -370,9 +423,7 @@ bool FileSegment::reserve(size_t size_to_reserve) * and the caller is going to continue; */ - size_t current_downloaded_size = getDownloadedSize(); - assert(reserved_size >= current_downloaded_size); - size_t already_reserved_size = reserved_size - current_downloaded_size; + size_t already_reserved_size = reserved_size - expected_downloaded_size; bool reserved = already_reserved_size >= size_to_reserve; if (!reserved) @@ -392,23 +443,13 @@ bool FileSegment::reserve(size_t size_to_reserve) return reserved; } -bool FileSegment::isDownloaded() const -{ - std::lock_guard segment_lock(mutex); - return isDownloadedUnlocked(segment_lock); -} - -bool FileSegment::isDownloadedUnlocked(std::lock_guard & /* segment_lock */) const -{ - return is_downloaded; -} - -void FileSegment::setDownloaded([[maybe_unused]] std::lock_guard & segment_lock) +void FileSegment::setDownloadedUnlocked([[maybe_unused]] std::unique_lock & segment_lock) { if (is_downloaded) return; - downloader_id.clear(); + setDownloadState(State::DOWNLOADED); + is_downloaded = true; if (cache_writer) { @@ -424,10 +465,12 @@ void FileSegment::setDownloaded([[maybe_unused]] std::lock_guard & s assert(std::filesystem::file_size(getPathInLocalCache()) > 0); } -void FileSegment::setDownloadFailed(std::lock_guard & /* segment_lock */) +void FileSegment::setDownloadFailedUnlocked(std::unique_lock & segment_lock) { - download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; - downloader_id.clear(); + LOG_INFO(log, "Settings download as failed: {}", getInfoForLogUnlocked(segment_lock)); + + setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); + resetDownloaderUnlocked(segment_lock); if (cache_writer) { @@ -437,43 +480,31 @@ void FileSegment::setDownloadFailed(std::lock_guard & /* segment_loc } } -void FileSegment::completeBatchAndResetDownloader() +void FileSegment::completePartAndResetDownloader() { - std::lock_guard segment_lock(mutex); + std::unique_lock segment_lock(mutex); + completePartAndResetDownloaderUnlocked(segment_lock); +} - assertNotDetached(segment_lock); +void FileSegment::completePartAndResetDownloaderUnlocked(std::unique_lock & segment_lock) +{ + assertNotDetachedUnlocked(segment_lock); + assertIsDownloaderUnlocked("completePartAndResetDownloader", segment_lock); - if (!isDownloaderImpl(segment_lock)) - { - cv.notify_all(); - throw Exception( - ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "File segment can be completed only by downloader ({} != {})", - downloader_id, getCallerId()); - } - - resetDownloaderImpl(segment_lock); - - LOG_TEST(log, "Complete batch. Current downloaded size: {}", getDownloadedSizeUnlocked(segment_lock)); + resetDownloadingStateUnlocked(segment_lock); + resetDownloaderUnlocked(segment_lock); + LOG_TEST(log, "Complete batch. ({})", getInfoForLogUnlocked(segment_lock)); cv.notify_all(); } void FileSegment::completeWithState(State state) { std::lock_guard cache_lock(cache->mutex); - std::lock_guard segment_lock(mutex); + std::unique_lock segment_lock(mutex); - assertNotDetached(segment_lock); - - auto caller_id = getCallerId(); - if (caller_id != downloader_id) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "File segment completion can be done only by downloader. (CallerId: {}, downloader id: {}", - caller_id, downloader_id); - } + assertNotDetachedUnlocked(segment_lock); + assertIsDownloaderUnlocked("complete", segment_lock); if (state != State::DOWNLOADED && state != State::PARTIALLY_DOWNLOADED @@ -485,24 +516,29 @@ void FileSegment::completeWithState(State state) "Cannot complete file segment with state: {}", stateToString(state)); } - download_state = state; + setDownloadState(state); completeBasedOnCurrentState(cache_lock, segment_lock); } -void FileSegment::completeWithoutState(std::lock_guard & cache_lock) +void FileSegment::completeWithoutState() { - std::lock_guard segment_lock(mutex); + std::lock_guard cache_lock(cache->mutex); + completeWithoutStateUnlocked(cache_lock); +} + +void FileSegment::completeWithoutStateUnlocked(std::lock_guard & cache_lock) +{ + std::unique_lock segment_lock(mutex); completeBasedOnCurrentState(cache_lock, segment_lock); } -void FileSegment::completeBasedOnCurrentState(std::lock_guard & cache_lock, std::lock_guard & segment_lock) +void FileSegment::completeBasedOnCurrentState(std::lock_guard & cache_lock, std::unique_lock & segment_lock) { if (is_detached) return; - bool is_downloader = isDownloaderImpl(segment_lock); + bool is_downloader = isDownloaderUnlocked(segment_lock); bool is_last_holder = cache->isLastFileSegmentHolder(key(), offset(), cache_lock, segment_lock); - bool can_update_segment_state = is_downloader || is_last_holder; size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock); SCOPE_EXIT({ @@ -512,16 +548,15 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard & cach } }); - LOG_TEST(log, "Complete without state (is_last_holder: {}). File segment info: {}", is_last_holder, getInfoForLogImpl(segment_lock)); + LOG_TEST( + log, + "Complete based on current state (is_last_holder: {}, {})", + is_last_holder, getInfoForLogUnlocked(segment_lock)); - if (can_update_segment_state) + if (is_downloader) { - if (current_downloaded_size == range().size()) - setDownloaded(segment_lock); - else - download_state = State::PARTIALLY_DOWNLOADED; - - resetDownloaderImpl(segment_lock); + resetDownloadingStateUnlocked(segment_lock); + resetDownloaderUnlocked(segment_lock); } switch (download_state) @@ -535,16 +570,16 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard & cach } case State::DOWNLOADED: { - assert(getDownloadedSizeUnlocked(segment_lock) == range().size()); - assert(isDownloadedUnlocked(segment_lock)); + chassert(getDownloadedSizeUnlocked(segment_lock) == range().size()); + assert(is_downloaded); break; } case State::DOWNLOADING: - case State::EMPTY: { - assert(!is_last_holder); + chassert(!is_last_holder); break; } + case State::EMPTY: case State::PARTIALLY_DOWNLOADED: case State::PARTIALLY_DOWNLOADED_NO_CONTINUATION: { @@ -554,7 +589,7 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard & cach { LOG_TEST(log, "Remove cell {} (nothing downloaded)", range().toString()); - download_state = State::SKIP_CACHE; + setDownloadState(State::SKIP_CACHE); cache->remove(key(), offset(), cache_lock, segment_lock); } else @@ -567,7 +602,7 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard & cach * in FileSegmentsHolder represent a contiguous range, so we can resize * it only when nobody needs it. */ - download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; + setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); /// Resize this file segment by creating a copy file segment with DOWNLOADED state, /// but current file segment should remain PARRTIALLY_DOWNLOADED_NO_CONTINUATION and with detached state, @@ -576,23 +611,22 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard & cach cache->reduceSizeToDownloaded(key(), offset(), cache_lock, segment_lock); } - markAsDetached(segment_lock); + detachAssumeStateFinalized(segment_lock); } break; } } - LOG_TEST(log, "Completed file segment: {}", getInfoForLogImpl(segment_lock)); - assertCorrectnessImpl(segment_lock); + LOG_TEST(log, "Completed file segment: {}", getInfoForLogUnlocked(segment_lock)); } String FileSegment::getInfoForLog() const { - std::lock_guard segment_lock(mutex); - return getInfoForLogImpl(segment_lock); + std::unique_lock segment_lock(mutex); + return getInfoForLogUnlocked(segment_lock); } -String FileSegment::getInfoForLogImpl(std::lock_guard & segment_lock) const +String FileSegment::getInfoForLogUnlocked(std::unique_lock & segment_lock) const { WriteBufferFromOwnString info; info << "File segment: " << range().toString() << ", "; @@ -601,15 +635,18 @@ String FileSegment::getInfoForLogImpl(std::lock_guard & segment_lock info << "downloaded size: " << getDownloadedSizeUnlocked(segment_lock) << ", "; info << "reserved size: " << reserved_size << ", "; info << "downloader id: " << (downloader_id.empty() ? "None" : downloader_id) << ", "; + info << "current write offset: " << getCurrentWriteOffsetUnlocked(segment_lock) << ", "; + info << "first non-downloaded offset: " << getFirstNonDownloadedOffsetUnlocked(segment_lock) << ", "; info << "caller id: " << getCallerId() << ", "; + info << "detached: " << is_detached << ", "; info << "persistent: " << is_persistent; return info.str(); } -void FileSegment::wrapWithCacheInfo(Exception & e, const String & message, std::lock_guard & segment_lock) const +void FileSegment::wrapWithCacheInfo(Exception & e, const String & message, std::unique_lock & segment_lock) const { - e.addMessage(fmt::format("{}, current cache state: {}", message, getInfoForLogImpl(segment_lock))); + e.addMessage(fmt::format("{}, current cache state: {}", message, getInfoForLogUnlocked(segment_lock))); } String FileSegment::stateToString(FileSegment::State state) @@ -634,63 +671,65 @@ String FileSegment::stateToString(FileSegment::State state) void FileSegment::assertCorrectness() const { - std::lock_guard segment_lock(mutex); - assertCorrectnessImpl(segment_lock); + std::unique_lock segment_lock(mutex); + assertCorrectnessUnlocked(segment_lock); } -void FileSegment::assertCorrectnessImpl(std::lock_guard & /* segment_lock */) const +void FileSegment::assertCorrectnessUnlocked(std::unique_lock & segment_lock) const { - assert(downloader_id.empty() == (download_state != FileSegment::State::DOWNLOADING)); - assert(!downloader_id.empty() == (download_state == FileSegment::State::DOWNLOADING)); - assert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(getPathInLocalCache()) > 0); + // auto current_downloader = getDownloaderUnlocked(false, segment_lock); + LOG_TEST(log, "Checking correctness: {}", getInfoForLogUnlocked(segment_lock)); + // chassert(current_downloader.empty() == (download_state != FileSegment::State::DOWNLOADING)); + // chassert(!current_downloader.empty() == (download_state == FileSegment::State::DOWNLOADING)); + // chassert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(getPathInLocalCache()) > 0); } -void FileSegment::throwIfDetached() const -{ - std::lock_guard segment_lock(mutex); - throwIfDetachedUnlocked(segment_lock); -} - -void FileSegment::throwIfDetachedUnlocked(std::lock_guard & segment_lock) const +void FileSegment::throwIfDetachedUnlocked(std::unique_lock & segment_lock) const { throw Exception( ErrorCodes::LOGICAL_ERROR, "Cache file segment is in detached state, operation not allowed. " "It can happen when cache was concurrently dropped with SYSTEM DROP FILESYSTEM CACHE FORCE. " - "Please, retry. File segment info: {}", getInfoForLogImpl(segment_lock)); + "Please, retry. File segment info: {}", getInfoForLogUnlocked(segment_lock)); } +void FileSegment::assertNotDetached() const +{ + std::unique_lock segment_lock(mutex); + assertNotDetachedUnlocked(segment_lock); +} -void FileSegment::assertNotDetached(std::lock_guard & segment_lock) const +void FileSegment::assertNotDetachedUnlocked(std::unique_lock & segment_lock) const { if (is_detached) throwIfDetachedUnlocked(segment_lock); } -void FileSegment::assertDetachedStatus(std::lock_guard & segment_lock) const +void FileSegment::assertDetachedStatus(std::unique_lock & segment_lock) const { /// Detached file segment is allowed to have only a certain subset of states. /// It should be either EMPTY or one of the finalized states. - if (download_state != State::EMPTY && !hasFinalizedState()) + if (download_state != State::EMPTY && !hasFinalizedStateUnlocked(segment_lock)) { throw Exception( ErrorCodes::LOGICAL_ERROR, "Detached file segment has incorrect state: {}", - getInfoForLogImpl(segment_lock)); + getInfoForLogUnlocked(segment_lock)); } } FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard & /* cache_lock */) { - std::lock_guard segment_lock(file_segment->mutex); + std::unique_lock segment_lock(file_segment->mutex); auto snapshot = std::make_shared( file_segment->offset(), file_segment->range().size(), file_segment->key(), nullptr, - State::EMPTY); + State::EMPTY, + CreateFileSegmentSettings{}); snapshot->hits_count = file_segment->getHitsCount(); snapshot->ref_count = file_segment.use_count(); @@ -701,41 +740,40 @@ FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment, std return snapshot; } -bool FileSegment::hasFinalizedState() const +bool FileSegment::hasFinalizedStateUnlocked(std::unique_lock & /* segment_lock */) const { return download_state == State::DOWNLOADED || download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION || download_state == State::SKIP_CACHE; } -void FileSegment::detach( - std::lock_guard & /* cache_lock */, - std::lock_guard & segment_lock) +bool FileSegment::isDetached() const +{ + std::unique_lock segment_lock(mutex); + return is_detached; +} + +void FileSegment::detach(std::lock_guard & /* cache_lock */, std::unique_lock & segment_lock) { - /// Now detached status can be in 2 cases, which do not do any complex logic: - /// 1. there is only 1 remaining file segment holder - /// && it does not need this segment anymore - /// && this file segment was in cache and needs to be removed - /// 2. in read_from_cache_if_exists_otherwise_bypass_cache case if (is_detached) return; - markAsDetached(segment_lock); - download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; - downloader_id.clear(); + resetDownloaderUnlocked(segment_lock); + setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); - LOG_DEBUG(log, "Detached file segment: {}", getInfoForLogImpl(segment_lock)); + detachAssumeStateFinalized(segment_lock); } -void FileSegment::markAsDetached(std::lock_guard & /* segment_lock */) +void FileSegment::detachAssumeStateFinalized(std::unique_lock & segment_lock) { is_detached = true; CurrentMetrics::add(CurrentMetrics::CacheDetachedFileSegments); + LOG_TEST(log, "Detached file segment: {}", getInfoForLogUnlocked(segment_lock)); } FileSegment::~FileSegment() { - std::lock_guard segment_lock(mutex); + std::unique_lock segment_lock(mutex); if (is_detached) CurrentMetrics::sub(CurrentMetrics::CacheDetachedFileSegments); } @@ -761,7 +799,7 @@ FileSegmentsHolder::~FileSegmentsHolder() bool is_detached = false; { - std::lock_guard segment_lock(file_segment->mutex); + std::unique_lock segment_lock(file_segment->mutex); is_detached = file_segment->isDetached(segment_lock); if (is_detached) file_segment->assertDetachedStatus(segment_lock); @@ -779,7 +817,7 @@ FileSegmentsHolder::~FileSegmentsHolder() /// under the same mutex, because complete() checks for segment pointers. std::lock_guard cache_lock(cache->mutex); - file_segment->completeWithoutState(cache_lock); + file_segment->completeWithoutStateUnlocked(cache_lock); file_segment_it = file_segments.erase(current_file_segment_it); } @@ -822,10 +860,16 @@ FileSegments::iterator FileSegmentRangeWriter::allocateFileSegment(size_t offset std::lock_guard cache_lock(cache->mutex); + CreateFileSegmentSettings create_settings + { + .is_persistent = is_persistent, + }; + /// We set max_file_segment_size to be downloaded, /// if we have less size to write, file segment will be resized in complete() method. auto file_segment = cache->createFileSegmentForDownload( - key, offset, cache->max_file_segment_size, is_persistent, cache_lock); + key, offset, cache->max_file_segment_size, create_settings, cache_lock); + return file_segments_holder.add(std::move(file_segment)); } @@ -859,10 +903,7 @@ void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment) file_segment.reserved_size = current_downloaded_size; } - { - std::lock_guard cache_lock(cache->mutex); - file_segment.completeWithoutState(cache_lock); - } + file_segment.completeWithoutState(); on_complete_file_segment_func(file_segment); } @@ -893,18 +934,20 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset offset, current_file_segment_write_offset); } + size_t current_write_offset = (*current_file_segment_it)->getCurrentWriteOffset(); + auto current_file_segment = *current_file_segment_it; if (current_file_segment->getRemainingSizeToDownload() == 0) { completeFileSegment(*current_file_segment); current_file_segment_it = allocateFileSegment(current_file_segment_write_offset, is_persistent); } - else if (current_file_segment->getDownloadOffset() != offset) + else if (current_write_offset != offset) { throw Exception( ErrorCodes::LOGICAL_ERROR, "Cannot file segment download offset {} does not match current write offset {}", - current_file_segment->getDownloadOffset(), offset); + current_write_offset, offset); } } @@ -915,7 +958,10 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset throw Exception(ErrorCodes::LOGICAL_ERROR, "Failed to set a downloader. ({})", file_segment->getInfoForLog()); SCOPE_EXIT({ - file_segment->resetDownloader(); + if (file_segment->isDownloader()) + { + file_segment->completePartAndResetDownloader(); + } }); bool reserved = file_segment->reserve(size); @@ -932,7 +978,17 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset return false; } - (*current_file_segment_it)->write(data, size, offset); + try + { + file_segment->write(data, size, offset); + } + catch (...) + { + file_segment->completePartAndResetDownloader(); + throw; + } + + file_segment->completePartAndResetDownloader(); current_file_segment_write_offset += size; return true; diff --git a/src/Interpreters/Cache/FileSegment.h b/src/Interpreters/Cache/FileSegment.h index f3fb367792a..9d9ddc3923a 100644 --- a/src/Interpreters/Cache/FileSegment.h +++ b/src/Interpreters/Cache/FileSegment.h @@ -1,11 +1,15 @@ #pragma once -#include +#include +#include #include #include +#include +#include +#include #include -#include +#include namespace Poco { class Logger; } @@ -26,17 +30,25 @@ using FileSegmentPtr = std::shared_ptr; using FileSegments = std::list; -class FileSegment : boost::noncopyable +struct CreateFileSegmentSettings +{ + bool is_persistent = false; +}; + +class FileSegment : private boost::noncopyable, public std::enable_shared_from_this { friend class FileCache; friend struct FileSegmentsHolder; friend class FileSegmentRangeWriter; +friend class StorageSystemFilesystemCache; public: using Key = FileCacheKey; using RemoteFileReaderPtr = std::shared_ptr; using LocalCacheWriterPtr = std::unique_ptr; + using Downloader = std::string; + using DownloaderId = std::string; enum class State { @@ -78,7 +90,7 @@ public: const Key & key_, FileCache * cache_, State download_state_, - bool is_persistent_ = false); + const CreateFileSegmentSettings & create_settings); ~FileSegment(); @@ -101,6 +113,14 @@ public: String toString() const { return fmt::format("[{}, {}]", std::to_string(left), std::to_string(right)); } }; + static String getCallerId(); + + String getInfoForLog() const; + + /** + * ========== Methods to get file segment's constant state ================== + */ + const Range & range() const { return segment_range; } const Key & key() const { return file_key; } @@ -109,11 +129,85 @@ public: bool isPersistent() const { return is_persistent; } + using UniqueId = std::pair; + UniqueId getUniqueId() const { return std::pair(key(), offset()); } + + String getPathInLocalCache() const; + + /** + * ========== Methods for _any_ file segment's owner ======================== + */ + + String getOrSetDownloader(); + + bool isDownloader() const; + + DownloaderId getDownloader() const; + + /// Wait for the change of state from DOWNLOADING to any other. State wait(); - bool reserve(size_t size); + bool isDownloaded() const; - void write(const char * from, size_t size, size_t offset_); + size_t getHitsCount() const { return hits_count; } + + size_t getRefCount() const { return ref_count; } + + void incrementHitsCount() { ++hits_count; } + + size_t getCurrentWriteOffset() const; + + size_t getFirstNonDownloadedOffset() const; + + size_t getDownloadedSize() const; + + /// Now detached status can be used in the following cases: + /// 1. there is only 1 remaining file segment holder + /// && it does not need this segment anymore + /// && this file segment was in cache and needs to be removed + /// 2. in read_from_cache_if_exists_otherwise_bypass_cache case to create NOOP file segments. + /// 3. removeIfExists - method which removes file segments from cache even though + /// it might be used at the moment. + + /// If file segment is detached it means the following: + /// 1. It is not present in FileCache, e.g. will not be visible to any cache user apart from + /// those who acquired shared pointer to this file segment before it was detached. + /// 2. Detached file segment can still be hold by some cache users, but it's state became + /// immutable at the point it was detached, any non-const / stateful method will throw an + /// exception. + void detach(std::lock_guard & cache_lock, std::unique_lock & segment_lock); + + static FileSegmentPtr getSnapshot(const FileSegmentPtr & file_segment, std::lock_guard & cache_lock); + + bool isDetached() const; + + void assertCorrectness() const; + + /** + * ========== Methods for _only_ file segment's `writer` ====================== + */ + + void synchronousWrite(const char * from, size_t size, size_t offset); + + /** + * ========== Methods for _only_ file segment's `downloader` ================== + */ + + /// Try to reserve exactly `size` bytes. + bool reserve(size_t size_to_reserve); + + /// Write data into reserved space. + void write(const char * from, size_t size, size_t offset); + + /// Complete file segment with a certain state. + void completeWithState(State state); + + void completeWithoutState(); + + /// Complete file segment's part which was last written. + void completePartAndResetDownloader(); + + void resetDownloader(); RemoteFileReaderPtr getRemoteFileReader(); @@ -123,91 +217,56 @@ public: void resetRemoteFileReader(); - String getOrSetDownloader(); - - String getDownloader() const; - - void resetDownloader(); - - bool isDownloader() const; - - bool isDownloaded() const; - - static String getCallerId(); - - size_t getDownloadOffset() const; - - size_t getDownloadedSize() const; - size_t getRemainingSizeToDownload() const; - void completeBatchAndResetDownloader(); - - void completeWithState(State state); - - String getInfoForLog() const; - - size_t getHitsCount() const { return hits_count; } - - size_t getRefCount() const { return ref_count; } - - void incrementHitsCount() { ++hits_count; } - - void assertCorrectness() const; - - static FileSegmentPtr getSnapshot( - const FileSegmentPtr & file_segment, - std::lock_guard & cache_lock); - - void detach( - std::lock_guard & cache_lock, - std::lock_guard & segment_lock); - - [[noreturn]] void throwIfDetached() const; - - bool isDetached() const; - - String getPathInLocalCache() const; - private: - size_t availableSize() const { return reserved_size - downloaded_size; } + size_t getFirstNonDownloadedOffsetUnlocked(std::unique_lock & segment_lock) const; + size_t getCurrentWriteOffsetUnlocked(std::unique_lock & segment_lock) const; + size_t getDownloadedSizeUnlocked(std::unique_lock & segment_lock) const; + size_t getAvailableSizeUnlocked(std::unique_lock & segment_lock) const; - size_t getDownloadedSizeUnlocked(std::lock_guard & segment_lock) const; - String getInfoForLogImpl(std::lock_guard & segment_lock) const; - void assertCorrectnessImpl(std::lock_guard & segment_lock) const; - bool hasFinalizedState() const; + String getInfoForLogUnlocked(std::unique_lock & segment_lock) const; - bool isDetached(std::lock_guard & /* segment_lock */) const { return is_detached; } - void markAsDetached(std::lock_guard & segment_lock); - [[noreturn]] void throwIfDetachedUnlocked(std::lock_guard & segment_lock) const; + String getDownloaderUnlocked(std::unique_lock & segment_lock) const; + void resetDownloaderUnlocked(std::unique_lock & segment_lock); + void resetDownloadingStateUnlocked(std::unique_lock & segment_lock); - void assertDetachedStatus(std::lock_guard & segment_lock) const; - void assertNotDetached(std::lock_guard & segment_lock) const; + void setDownloadState(State state); - void setDownloaded(std::lock_guard & segment_lock); - void setDownloadFailed(std::lock_guard & segment_lock); - bool isDownloaderImpl(std::lock_guard & segment_lock) const; + void setDownloadedUnlocked(std::unique_lock & segment_lock); + void setDownloadFailedUnlocked(std::unique_lock & segment_lock); - bool isDownloadedUnlocked(std::lock_guard & segment_lock) const; + bool hasFinalizedStateUnlocked(std::unique_lock & segment_lock) const; - void wrapWithCacheInfo(Exception & e, const String & message, std::lock_guard & segment_lock) const; + bool isDownloaderUnlocked(std::unique_lock & segment_lock) const; - bool lastFileSegmentHolder() const; + bool isDetached(std::unique_lock & /* segment_lock */) const { return is_detached; } + void detachAssumeStateFinalized(std::unique_lock & segment_lock); + [[noreturn]] void throwIfDetachedUnlocked(std::unique_lock & segment_lock) const; + + void assertDetachedStatus(std::unique_lock & segment_lock) const; + void assertNotDetached() const; + void assertNotDetachedUnlocked(std::unique_lock & segment_lock) const; + void assertIsDownloaderUnlocked(const std::string & operation, std::unique_lock & segment_lock) const; + void assertCorrectnessUnlocked(std::unique_lock & segment_lock) const; /// complete() without any completion state is called from destructor of /// FileSegmentsHolder. complete() might check if the caller of the method /// is the last alive holder of the segment. Therefore, complete() and destruction /// of the file segment pointer must be done under the same cache mutex. - void completeBasedOnCurrentState(std::lock_guard & cache_lock, std::lock_guard & segment_lock); - void completeWithoutState(std::lock_guard & cache_lock); + void completeWithoutStateUnlocked(std::lock_guard & cache_lock); + void completeBasedOnCurrentState(std::lock_guard & cache_lock, std::unique_lock & segment_lock); - void resetDownloaderImpl(std::lock_guard & segment_lock); + void completePartAndResetDownloaderUnlocked(std::unique_lock & segment_lock); + + void wrapWithCacheInfo(Exception & e, const String & message, std::unique_lock & segment_lock) const; Range segment_range; State download_state; - String downloader_id; + /// The one who prepares the download + DownloaderId downloader_id; RemoteFileReaderPtr remote_file_reader; LocalCacheWriterPtr cache_writer; @@ -245,6 +304,7 @@ private: std::atomic ref_count = 0; /// Used for getting snapshot state bool is_persistent; + CurrentMetrics::Increment metric_increment{CurrentMetrics::CacheFileSegments}; }; diff --git a/src/Interpreters/tests/gtest_lru_file_cache.cpp b/src/Interpreters/tests/gtest_lru_file_cache.cpp index 6460eeef8c5..22150b9f656 100644 --- a/src/Interpreters/tests/gtest_lru_file_cache.cpp +++ b/src/Interpreters/tests/gtest_lru_file_cache.cpp @@ -3,9 +3,9 @@ #include #include #include -#include #include #include +#include #include #include #include @@ -64,7 +64,7 @@ void download(DB::FileSegmentPtr file_segment) fs::create_directories(subdir); std::string data(size, '0'); - file_segment->write(data.data(), size, file_segment->getDownloadOffset()); + file_segment->write(data.data(), size, file_segment->getCurrentWriteOffset()); } void prepareAndDownload(DB::FileSegmentPtr file_segment) @@ -89,6 +89,7 @@ TEST(FileCache, get) { if (fs::exists(cache_base_path)) fs::remove_all(cache_base_path); + fs::create_directories(cache_base_path); DB::ThreadStatus thread_status; @@ -109,7 +110,7 @@ TEST(FileCache, get) auto key = cache.hash("key1"); { - auto holder = cache.getOrSet(key, 0, 10, false); /// Add range [0, 9] + auto holder = cache.getOrSet(key, 0, 10, {}); /// Add range [0, 9] auto segments = fromHolder(holder); /// Range was not present in cache. It should be added in cache as one while file segment. ASSERT_EQ(segments.size(), 1); @@ -138,7 +139,7 @@ TEST(FileCache, get) { /// Want range [5, 14], but [0, 9] already in cache, so only [10, 14] will be put in cache. - auto holder = cache.getOrSet(key, 5, 10, false); + auto holder = cache.getOrSet(key, 5, 10, {}); auto segments = fromHolder(holder); ASSERT_EQ(segments.size(), 2); @@ -158,14 +159,14 @@ TEST(FileCache, get) ASSERT_EQ(cache.getUsedCacheSize(), 15); { - auto holder = cache.getOrSet(key, 9, 1, false); /// Get [9, 9] + auto holder = cache.getOrSet(key, 9, 1, {}); /// Get [9, 9] auto segments = fromHolder(holder); ASSERT_EQ(segments.size(), 1); assertRange(7, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED); } { - auto holder = cache.getOrSet(key, 9, 2, false); /// Get [9, 10] + auto holder = cache.getOrSet(key, 9, 2, {}); /// Get [9, 10] auto segments = fromHolder(holder); ASSERT_EQ(segments.size(), 2); assertRange(8, segments[0], DB::FileSegment::Range(0, 9), DB::FileSegment::State::DOWNLOADED); @@ -173,15 +174,15 @@ TEST(FileCache, get) } { - auto holder = cache.getOrSet(key, 10, 1, false); /// Get [10, 10] + auto holder = cache.getOrSet(key, 10, 1, {}); /// Get [10, 10] auto segments = fromHolder(holder); ASSERT_EQ(segments.size(), 1); assertRange(10, segments[0], DB::FileSegment::Range(10, 14), DB::FileSegment::State::DOWNLOADED); } - complete(cache.getOrSet(key, 17, 4, false)); /// Get [17, 20] - complete(cache.getOrSet(key, 24, 3, false)); /// Get [24, 26] - /// complete(cache.getOrSet(key, 27, 1, false)); /// Get [27, 27] + complete(cache.getOrSet(key, 17, 4, {})); /// Get [17, 20] + complete(cache.getOrSet(key, 24, 3, {})); /// Get [24, 26] + /// completeWithState(cache.getOrSet(key, 27, 1, false)); /// Get [27, 27] /// Current cache: [__________][_____] [____] [___][] /// ^ ^^ ^ ^ ^ ^ ^^^ @@ -191,7 +192,7 @@ TEST(FileCache, get) ASSERT_EQ(cache.getUsedCacheSize(), 22); { - auto holder = cache.getOrSet(key, 0, 26, false); /// Get [0, 25] + auto holder = cache.getOrSet(key, 0, 26, {}); /// Get [0, 25] auto segments = fromHolder(holder); ASSERT_EQ(segments.size(), 6); @@ -225,14 +226,14 @@ TEST(FileCache, get) /// as max elements size is reached, next attempt to put something in cache should fail. /// This will also check that [27, 27] was indeed evicted. - auto holder1 = cache.getOrSet(key, 27, 1, false); + auto holder1 = cache.getOrSet(key, 27, 1, {}); auto segments_1 = fromHolder(holder1); /// Get [27, 27] ASSERT_EQ(segments_1.size(), 1); assertRange(17, segments_1[0], DB::FileSegment::Range(27, 27), DB::FileSegment::State::EMPTY); } { - auto holder = cache.getOrSet(key, 12, 10, false); /// Get [12, 21] + auto holder = cache.getOrSet(key, 12, 10, {}); /// Get [12, 21] auto segments = fromHolder(holder); ASSERT_EQ(segments.size(), 4); @@ -256,7 +257,7 @@ TEST(FileCache, get) ASSERT_EQ(cache.getFileSegmentsNum(), 5); { - auto holder = cache.getOrSet(key, 23, 5, false); /// Get [23, 28] + auto holder = cache.getOrSet(key, 23, 5, {}); /// Get [23, 28] auto segments = fromHolder(holder); ASSERT_EQ(segments.size(), 3); @@ -277,12 +278,12 @@ TEST(FileCache, get) /// 17 21 2324 26 28 { - auto holder5 = cache.getOrSet(key, 2, 3,false); /// Get [2, 4] + auto holder5 = cache.getOrSet(key, 2, 3, {}); /// Get [2, 4] auto s5 = fromHolder(holder5); ASSERT_EQ(s5.size(), 1); assertRange(25, s5[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::EMPTY); - auto holder1 = cache.getOrSet(key, 30, 2, false); /// Get [30, 31] + auto holder1 = cache.getOrSet(key, 30, 2, {}); /// Get [30, 31] auto s1 = fromHolder(holder1); ASSERT_EQ(s1.size(), 1); assertRange(26, s1[0], DB::FileSegment::Range(30, 31), DB::FileSegment::State::EMPTY); @@ -298,20 +299,20 @@ TEST(FileCache, get) /// ^ ^ ^ ^ ^ ^ ^ ^ /// 2 4 23 24 26 27 30 31 - auto holder2 = cache.getOrSet(key, 23, 1, false); /// Get [23, 23] + auto holder2 = cache.getOrSet(key, 23, 1, {}); /// Get [23, 23] auto s2 = fromHolder(holder2); ASSERT_EQ(s2.size(), 1); - auto holder3 = cache.getOrSet(key, 24, 3, false); /// Get [24, 26] + auto holder3 = cache.getOrSet(key, 24, 3, {}); /// Get [24, 26] auto s3 = fromHolder(holder3); ASSERT_EQ(s3.size(), 1); - auto holder4 = cache.getOrSet(key, 27, 1, false); /// Get [27, 27] + auto holder4 = cache.getOrSet(key, 27, 1, {}); /// Get [27, 27] auto s4 = fromHolder(holder4); ASSERT_EQ(s4.size(), 1); /// All cache is now unreleasable because pointers are still hold - auto holder6 = cache.getOrSet(key, 0, 40, false); + auto holder6 = cache.getOrSet(key, 0, 40, {}); auto f = fromHolder(holder6); ASSERT_EQ(f.size(), 9); @@ -332,7 +333,7 @@ TEST(FileCache, get) } { - auto holder = cache.getOrSet(key, 2, 3, false); /// Get [2, 4] + auto holder = cache.getOrSet(key, 2, 3, {}); /// Get [2, 4] auto segments = fromHolder(holder); ASSERT_EQ(segments.size(), 1); assertRange(31, segments[0], DB::FileSegment::Range(2, 4), DB::FileSegment::State::DOWNLOADED); @@ -343,7 +344,7 @@ TEST(FileCache, get) /// 2 4 23 24 26 27 30 31 { - auto holder = cache.getOrSet(key, 25, 5, false); /// Get [25, 29] + auto holder = cache.getOrSet(key, 25, 5, {}); /// Get [25, 29] auto segments = fromHolder(holder); ASSERT_EQ(segments.size(), 3); @@ -367,7 +368,7 @@ TEST(FileCache, get) DB::CurrentThread::QueryScope query_scope_holder_1(query_context_1); thread_status_1.attachQueryContext(query_context_1); - auto holder_2 = cache.getOrSet(key, 25, 5, false); /// Get [25, 29] once again. + auto holder_2 = cache.getOrSet(key, 25, 5, {}); /// Get [25, 29] once again. auto segments_2 = fromHolder(holder_2); ASSERT_EQ(segments.size(), 3); @@ -406,11 +407,11 @@ TEST(FileCache, get) { /// Now let's check the similar case but getting ERROR state after segment->wait(), when - /// state is changed not manually via segment->complete(state) but from destructor of holder + /// state is changed not manually via segment->completeWithState(state) but from destructor of holder /// and notify_all() is also called from destructor of holder. std::optional holder; - holder.emplace(cache.getOrSet(key, 3, 23, false)); /// Get [3, 25] + holder.emplace(cache.getOrSet(key, 3, 23, {})); /// Get [3, 25] auto segments = fromHolder(*holder); ASSERT_EQ(segments.size(), 3); @@ -436,7 +437,7 @@ TEST(FileCache, get) DB::CurrentThread::QueryScope query_scope_holder_1(query_context_1); thread_status_1.attachQueryContext(query_context_1); - auto holder_2 = cache.getOrSet(key, 3, 23, false); /// Get [3, 25] once again + auto holder_2 = cache.getOrSet(key, 3, 23, {}); /// Get [3, 25] once again auto segments_2 = fromHolder(*holder); ASSERT_EQ(segments_2.size(), 3); @@ -485,7 +486,7 @@ TEST(FileCache, get) cache2.initialize(); auto key = cache2.hash("key1"); - auto holder1 = cache2.getOrSet(key, 2, 28, false); /// Get [2, 29] + auto holder1 = cache2.getOrSet(key, 2, 28, {}); /// Get [2, 29] auto segments1 = fromHolder(holder1); ASSERT_EQ(segments1.size(), 5); @@ -506,7 +507,7 @@ TEST(FileCache, get) cache2.initialize(); auto key = cache2.hash("key1"); - auto holder1 = cache2.getOrSet(key, 0, 25, false); /// Get [0, 24] + auto holder1 = cache2.getOrSet(key, 0, 25, {}); /// Get [0, 24] auto segments1 = fromHolder(holder1); ASSERT_EQ(segments1.size(), 3); From 37bdba98b122abdfaa68027309f2dce9ff812c89 Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 9 Sep 2022 15:41:31 +0200 Subject: [PATCH 2/4] Fix --- src/Interpreters/Cache/FileSegment.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Interpreters/Cache/FileSegment.cpp b/src/Interpreters/Cache/FileSegment.cpp index 8a67dfc1dc6..02452ab2a6d 100644 --- a/src/Interpreters/Cache/FileSegment.cpp +++ b/src/Interpreters/Cache/FileSegment.cpp @@ -894,13 +894,15 @@ void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment) /// and therefore cannot be concurrently accessed. Nevertheless, it can be /// accessed by cache system tables if someone read from them, /// therefore we need a mutex. - std::lock_guard segment_lock(file_segment.mutex); + std::unique_lock segment_lock(file_segment.mutex); assert(current_downloaded_size <= file_segment.range().size()); file_segment.segment_range = FileSegment::Range( file_segment.segment_range.left, file_segment.segment_range.left + current_downloaded_size - 1); file_segment.reserved_size = current_downloaded_size; + + file_segment.setDownloadedUnlocked(segment_lock); } file_segment.completeWithoutState(); From 9c2bbcf4a612626a7b4203df00157d91b83d0a6c Mon Sep 17 00:00:00 2001 From: kssenii Date: Sun, 11 Sep 2022 17:38:51 +0200 Subject: [PATCH 3/4] Fix --- src/Interpreters/Cache/FileSegment.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/Interpreters/Cache/FileSegment.cpp b/src/Interpreters/Cache/FileSegment.cpp index 02452ab2a6d..e45faabb1e9 100644 --- a/src/Interpreters/Cache/FileSegment.cpp +++ b/src/Interpreters/Cache/FileSegment.cpp @@ -555,7 +555,8 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard & cach if (is_downloader) { - resetDownloadingStateUnlocked(segment_lock); + if (download_state == State::DOWNLOADING) /// != in case of completeWithState + resetDownloadingStateUnlocked(segment_lock); resetDownloaderUnlocked(segment_lock); } @@ -572,6 +573,7 @@ void FileSegment::completeBasedOnCurrentState(std::lock_guard & cach { chassert(getDownloadedSizeUnlocked(segment_lock) == range().size()); assert(is_downloaded); + assert(!cache_writer); break; } case State::DOWNLOADING: From 413fbb6507f2225228e4b0a090001901168fe3e7 Mon Sep 17 00:00:00 2001 From: kssenii Date: Mon, 12 Sep 2022 14:39:52 +0200 Subject: [PATCH 4/4] Self-review fixes --- .../IO/CachedOnDiskReadBufferFromFile.cpp | 4 +- src/Interpreters/Cache/FileSegment.cpp | 62 +++++++++---------- src/Interpreters/Cache/FileSegment.h | 1 - 3 files changed, 30 insertions(+), 37 deletions(-) diff --git a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp index 4306cf7ae4d..fa4a79415ec 100644 --- a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp @@ -206,7 +206,7 @@ CachedOnDiskReadBufferFromFile::getRemoteFSReadBuffer(FileSegment & file_segment return remote_file_reader; auto remote_fs_segment_reader = file_segment.extractRemoteFileReader(); - if (remote_fs_segment_reader && file_offset_of_buffer_end == implementation_buffer->getFileOffsetOfBufferEnd()) + if (remote_fs_segment_reader && file_offset_of_buffer_end == remote_file_reader->getFileOffsetOfBufferEnd()) remote_file_reader = remote_fs_segment_reader; else remote_file_reader = implementation_buffer_creator(); @@ -294,7 +294,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil case FileSegment::State::EMPTY: case FileSegment::State::PARTIALLY_DOWNLOADED: { - if (file_segment->getFirstNonDownloadedOffset() > file_offset_of_buffer_end) + if (canStartFromCache(file_offset_of_buffer_end, *file_segment)) { /// segment{k} state: PARTIALLY_DOWNLOADED /// cache: [______|___________ diff --git a/src/Interpreters/Cache/FileSegment.cpp b/src/Interpreters/Cache/FileSegment.cpp index e45faabb1e9..e0cf581dd95 100644 --- a/src/Interpreters/Cache/FileSegment.cpp +++ b/src/Interpreters/Cache/FileSegment.cpp @@ -125,13 +125,6 @@ size_t FileSegment::getDownloadedSizeUnlocked(std::unique_lock & /* return downloaded_size; } -size_t FileSegment::getAvailableSizeUnlocked(std::unique_lock & segment_lock) const -{ - auto current_downloaded_size = getDownloadedSizeUnlocked(segment_lock); - chassert(reserved_size >= current_downloaded_size); - return reserved_size - current_downloaded_size; -} - size_t FileSegment::getRemainingSizeToDownload() const { std::unique_lock segment_lock(mutex); @@ -177,9 +170,7 @@ String FileSegment::getOrSetDownloader() { bool allow_new_downloader = download_state == State::EMPTY || download_state == State::PARTIALLY_DOWNLOADED; if (!allow_new_downloader) - return "None"; - - chassert(download_state != State::DOWNLOADING); + return "notAllowed:" + stateToString(download_state); current_downloader = downloader_id = getCallerId(); setDownloadState(State::DOWNLOADING); @@ -313,27 +304,29 @@ void FileSegment::write(const char * from, size_t size, size_t offset) "Attempt to write {} bytes to offset: {}, but current write offset is {}", size, offset, first_non_downloaded_offset); - size_t available_size = getAvailableSizeUnlocked(segment_lock); - if (available_size < size) + size_t current_downloaded_size = getDownloadedSizeUnlocked(segment_lock); + chassert(reserved_size >= current_downloaded_size); + size_t free_reserved_size = reserved_size - current_downloaded_size; + + if (free_reserved_size < size) throw Exception( ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, - "Not enough space is reserved. Available: {}, expected: {}", available_size, size); + "Not enough space is reserved. Available: {}, expected: {}", free_reserved_size, size); - if (getDownloadedSizeUnlocked(segment_lock) == range().size()) + if (current_downloaded_size == range().size()) throw Exception(ErrorCodes::LOGICAL_ERROR, "File segment is already fully downloaded"); - } - if (!cache_writer) - { - auto current_downloaded_size = getDownloadedSize(); - if (current_downloaded_size > 0) - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Cache writer was finalized (downloaded size: {}, state: {})", - current_downloaded_size, stateToString(download_state)); + if (!cache_writer) + { + if (current_downloaded_size > 0) + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cache writer was finalized (downloaded size: {}, state: {})", + current_downloaded_size, stateToString(download_state)); - auto download_path = getPathInLocalCache(); - cache_writer = std::make_unique(download_path); + auto download_path = getPathInLocalCache(); + cache_writer = std::make_unique(download_path); + } } try @@ -360,8 +353,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset) } #ifndef NDEBUG - std::unique_lock segment_lock(mutex); - chassert(getFirstNonDownloadedOffsetUnlocked(segment_lock) == offset + size); + chassert(getFirstNonDownloadedOffset() == offset + size); #endif } @@ -679,11 +671,10 @@ void FileSegment::assertCorrectness() const void FileSegment::assertCorrectnessUnlocked(std::unique_lock & segment_lock) const { - // auto current_downloader = getDownloaderUnlocked(false, segment_lock); - LOG_TEST(log, "Checking correctness: {}", getInfoForLogUnlocked(segment_lock)); - // chassert(current_downloader.empty() == (download_state != FileSegment::State::DOWNLOADING)); - // chassert(!current_downloader.empty() == (download_state == FileSegment::State::DOWNLOADING)); - // chassert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(getPathInLocalCache()) > 0); + auto current_downloader = getDownloaderUnlocked(segment_lock); + chassert(current_downloader.empty() == (download_state != FileSegment::State::DOWNLOADING)); + chassert(!current_downloader.empty() == (download_state == FileSegment::State::DOWNLOADING)); + chassert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(getPathInLocalCache()) > 0); } void FileSegment::throwIfDetachedUnlocked(std::unique_lock & segment_lock) const @@ -760,9 +751,12 @@ void FileSegment::detach(std::lock_guard & /* cache_lock */, std::un if (is_detached) return; - resetDownloaderUnlocked(segment_lock); - setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); + if (download_state == State::DOWNLOADING) + resetDownloadingStateUnlocked(segment_lock); + else + setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); + resetDownloaderUnlocked(segment_lock); detachAssumeStateFinalized(segment_lock); } diff --git a/src/Interpreters/Cache/FileSegment.h b/src/Interpreters/Cache/FileSegment.h index 9d9ddc3923a..9f6c3697960 100644 --- a/src/Interpreters/Cache/FileSegment.h +++ b/src/Interpreters/Cache/FileSegment.h @@ -223,7 +223,6 @@ private: size_t getFirstNonDownloadedOffsetUnlocked(std::unique_lock & segment_lock) const; size_t getCurrentWriteOffsetUnlocked(std::unique_lock & segment_lock) const; size_t getDownloadedSizeUnlocked(std::unique_lock & segment_lock) const; - size_t getAvailableSizeUnlocked(std::unique_lock & segment_lock) const; String getInfoForLogUnlocked(std::unique_lock & segment_lock) const;