diff --git a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp index 7827e537708..0cee265a11e 100644 --- a/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskReadBufferFromFile.cpp @@ -598,8 +598,8 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment) chassert(file_segment.getCurrentWriteOffset(false) == static_cast(implementation_buffer->getPosition())); - bool success = writeCache(implementation_buffer->buffer().begin(), current_predownload_size, current_offset, file_segment); - if (success) + continue_predownload = writeCache(implementation_buffer->buffer().begin(), current_predownload_size, current_offset, file_segment); + if (continue_predownload) { current_offset += current_predownload_size; @@ -609,7 +609,6 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment) else { LOG_TEST(log, "Bypassing cache because writeCache (in predownload) method failed"); - continue_predownload = false; } } @@ -631,7 +630,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment) /// TODO: allow seek more than once with seek avoiding. bytes_to_predownload = 0; - file_segment.setBroken(); + file_segment.completePartAndResetDownloader(); chassert(file_segment.state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); LOG_TEST(log, "Bypassing cache because for {}", file_segment.getInfoForLog()); @@ -945,13 +944,6 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep() { LOG_TRACE(log, "No space left in cache to reserve {} bytes, will continue without cache download", size); } - - if (!success) - { - file_segment.setBroken(); - read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE; - download_current_segment = false; - } } /// - If last file segment was read from remote fs, then we read up to segment->range().right, diff --git a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp index 9b351df041b..d72dcecb484 100644 --- a/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp +++ b/src/Disks/IO/CachedOnDiskWriteBufferFromFile.cpp @@ -88,7 +88,6 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset bool reserved = file_segment->reserve(size_to_write); if (!reserved) { - file_segment->setBroken(); appendFilesystemCacheLog(*file_segment); LOG_DEBUG( diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index 9db634a8188..d50af057aed 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -558,7 +558,8 @@ bool FileCache::tryReserve(FileSegment & file_segment, size_t size) auto iterate_func = [&](LockedKey & locked_key, FileSegmentMetadataPtr segment_metadata) { - chassert(segment_metadata->file_segment->getQueueIterator()); + chassert(segment_metadata->file_segment->assertCorrectness()); + const bool is_persistent = allow_persistent_files && segment_metadata->file_segment->isPersistent(); const bool releasable = segment_metadata->releasable() && !is_persistent; @@ -821,13 +822,13 @@ void FileCache::loadMetadata() auto file_segment_metadata_it = addFileSegment( *locked_key, offset, size, FileSegment::State::DOWNLOADED, CreateFileSegmentSettings(segment_kind), &lock); - chassert(file_segment_metadata_it->second->file_segment->getQueueIterator()); - chassert(file_segment_metadata_it->second->size() == size); + const auto & file_segment_metadata = file_segment_metadata_it->second; + chassert(file_segment_metadata->file_segment->assertCorrectness()); total_size += size; queue_entries.emplace_back( - file_segment_metadata_it->second->file_segment->getQueueIterator(), - file_segment_metadata_it->second->file_segment); + file_segment_metadata->getQueueIterator(), + file_segment_metadata->file_segment); } else { @@ -965,22 +966,14 @@ size_t FileCache::getFileSegmentsNum() const void FileCache::assertCacheCorrectness() { - metadata.iterate([&](const LockedKey & locked_key) + auto lock = cache_guard.lock(); + main_priority->iterate([&](LockedKey &, FileSegmentMetadataPtr segment_metadata) { - for (const auto & [offset, file_segment_metadata] : locked_key) - { - const auto & file_segment = *file_segment_metadata->file_segment; - - if (file_segment.key() != locked_key.getKey()) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Expected {} = {}", file_segment.key(), locked_key.getKey()); - } - - file_segment.assertCorrectness(); - } - }); + const auto & file_segment = *segment_metadata->file_segment; + UNUSED(file_segment); + chassert(file_segment.assertCorrectness()); + return PriorityIterationResult::CONTINUE; + }, lock); } FileCache::QueryContextHolder::QueryContextHolder( diff --git a/src/Interpreters/Cache/FileSegment.cpp b/src/Interpreters/Cache/FileSegment.cpp index b06bc521ac0..7373e371f4c 100644 --- a/src/Interpreters/Cache/FileSegment.cpp +++ b/src/Interpreters/Cache/FileSegment.cpp @@ -32,7 +32,7 @@ FileSegment::FileSegment( const CreateFileSegmentSettings & settings, FileCache * cache_, std::weak_ptr key_metadata_, - CachePriorityIterator queue_iterator_) + Priority::Iterator queue_iterator_) : file_key(key_) , segment_range(offset_, offset_ + size_ - 1) , segment_kind(settings.kind) @@ -101,13 +101,13 @@ size_t FileSegment::getReservedSize() const return reserved_size; } -FileSegment::CachePriorityIterator FileSegment::getQueueIterator() const +FileSegment::Priority::Iterator FileSegment::getQueueIterator() const { auto lock = segment_guard.lock(); return queue_iterator; } -void FileSegment::setQueueIterator(CachePriorityIterator iterator) +void FileSegment::setQueueIterator(Priority::Iterator iterator) { auto lock = segment_guard.lock(); if (queue_iterator) @@ -355,8 +355,6 @@ void FileSegment::write(const char * from, size_t size, size_t offset) setDownloadFailedUnlocked(lock); - cv.notify_all(); - throw; } @@ -384,7 +382,7 @@ FileSegment::State FileSegment::wait(size_t offset) { return download_state != State::DOWNLOADING || offset < getCurrentWriteOffset(true); }); - chassert(ok); + /// chassert(ok); } return download_state; @@ -457,18 +455,20 @@ bool FileSegment::reserve(size_t size_to_reserve) size_t already_reserved_size = reserved_size - expected_downloaded_size; bool reserved = already_reserved_size >= size_to_reserve; + if (reserved) + return reserved; + + size_to_reserve = size_to_reserve - already_reserved_size; + + /// This (resizable file segments) is allowed only for single threaded use of file segment. + /// Currently it is used only for temporary files through cache. + if (is_unbound && is_file_segment_size_exceeded) + segment_range.right = range().left + expected_downloaded_size + size_to_reserve; + + reserved = cache->tryReserve(*this, size_to_reserve); + if (!reserved) - { - size_to_reserve = size_to_reserve - already_reserved_size; - - /// This (resizable file segments) is allowed only for single threaded use of file segment. - /// Currently it is used only for temporary files through cache. - if (is_unbound && is_file_segment_size_exceeded) - segment_range.right = range().left + expected_downloaded_size + size_to_reserve; - - reserved = cache->tryReserve(*this, size_to_reserve); - chassert(assertCorrectness()); - } + setDownloadFailedUnlocked(segment_guard.lock()); return reserved; } @@ -516,27 +516,15 @@ void FileSegment::completePartAndResetDownloader() assertNotDetachedUnlocked(lock); assertIsDownloaderUnlocked("completePartAndResetDownloader", lock); - resetDownloadingStateUnlocked(lock); - resetDownloaderUnlocked(lock); - - LOG_TEST(log, "Complete batch. ({})", getInfoForLogUnlocked(lock)); -} - -void FileSegment::setBroken() -{ - auto lock = segment_guard.lock(); - - SCOPE_EXIT({ cv.notify_all(); }); - - assertNotDetachedUnlocked(lock); - assertIsDownloaderUnlocked("setBroken", lock); + chassert(download_state == State::DOWNLOADING + || download_state == State::PARTIALLY_DOWNLOADED_NO_CONTINUATION); if (download_state == State::DOWNLOADING) resetDownloadingStateUnlocked(lock); - if (download_state != State::DOWNLOADED) - download_state = State::PARTIALLY_DOWNLOADED_NO_CONTINUATION; resetDownloaderUnlocked(lock); + + LOG_TEST(log, "Complete batch. ({})", getInfoForLogUnlocked(lock)); } void FileSegment::complete() @@ -700,24 +688,48 @@ String FileSegment::stateToString(FileSegment::State state) bool FileSegment::assertCorrectness() const { - auto lock = segment_guard.lock(); + return assertCorrectnessUnlocked(segment_guard.lock()); +} - auto current_downloader = getDownloaderUnlocked(lock); - chassert(current_downloader.empty() == (download_state != FileSegment::State::DOWNLOADING)); - chassert(!current_downloader.empty() == (download_state == FileSegment::State::DOWNLOADING)); - chassert(download_state != FileSegment::State::DOWNLOADED || std::filesystem::file_size(getPathInLocalCache()) > 0); - - chassert(reserved_size == 0 || queue_iterator); - if (queue_iterator) +bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock &) const +{ + auto check_iterator = [this](const Priority::Iterator & it) { - const auto & entry = queue_iterator->getEntry(); - if (isCompleted(false)) - chassert(reserved_size == entry.size); - else - /// We cannot check == here because reserved_size is not - /// guarded by any mutex, it is just an atomic. - chassert(reserved_size <= entry.size); + if (!it) + return; + + const auto entry = it->getEntry(); + UNUSED(entry); + chassert(entry.size == reserved_size); + chassert(entry.key == key()); + chassert(entry.offset == offset()); + }; + + if (download_state == State::DOWNLOADED) + { + chassert(downloader_id.empty()); + chassert(downloaded_size == reserved_size); + chassert(std::filesystem::file_size(getPathInLocalCache()) > 0); + chassert(queue_iterator); + check_iterator(queue_iterator); } + else + { + if (download_state == State::DOWNLOADED) + { + chassert(!downloader_id.empty()); + } + else if (download_state == State::PARTIALLY_DOWNLOADED + || download_state == State::EMPTY) + { + chassert(downloader_id.empty()); + } + + chassert(reserved_size >= downloaded_size); + chassert((reserved_size == 0) || queue_iterator); + check_iterator(queue_iterator); + } + return true; } diff --git a/src/Interpreters/Cache/FileSegment.h b/src/Interpreters/Cache/FileSegment.h index e6aee5827e7..1ac09de23ad 100644 --- a/src/Interpreters/Cache/FileSegment.h +++ b/src/Interpreters/Cache/FileSegment.h @@ -72,7 +72,7 @@ public: using LocalCacheWriterPtr = std::unique_ptr; using Downloader = std::string; using DownloaderId = std::string; - using CachePriorityIterator = IFileCachePriority::Iterator; + using Priority = IFileCachePriority; enum class State { @@ -116,7 +116,7 @@ public: const CreateFileSegmentSettings & create_settings = {}, FileCache * cache_ = nullptr, std::weak_ptr key_metadata_ = std::weak_ptr(), - CachePriorityIterator queue_iterator_ = CachePriorityIterator{}); + Priority::Iterator queue_iterator_ = Priority::Iterator{}); ~FileSegment() = default; @@ -222,9 +222,9 @@ public: FileSegmentGuard::Lock lock() const { return segment_guard.lock(); } - CachePriorityIterator getQueueIterator() const; + Priority::Iterator getQueueIterator() const; - void setQueueIterator(CachePriorityIterator iterator); + void setQueueIterator(Priority::Iterator iterator); KeyMetadataPtr tryGetKeyMetadata() const; @@ -236,8 +236,6 @@ public: * ========== Methods that must do cv.notify() ================== */ - void setBroken(); - void complete(); void completePartAndResetDownloader(); @@ -285,6 +283,7 @@ private: void assertNotDetached() const; void assertNotDetachedUnlocked(const FileSegmentGuard::Lock &) const; void assertIsDownloaderUnlocked(const std::string & operation, const FileSegmentGuard::Lock &) const; + bool assertCorrectnessUnlocked(const FileSegmentGuard::Lock &) const; LockedKeyPtr lockKeyMetadata(bool assert_exists = true) const; @@ -308,7 +307,7 @@ private: mutable FileSegmentGuard segment_guard; std::weak_ptr key_metadata; - mutable CachePriorityIterator queue_iterator; /// Iterator is put here on first reservation attempt, if successful. + mutable Priority::Iterator queue_iterator; /// Iterator is put here on first reservation attempt, if successful. FileCache * cache; std::condition_variable cv; diff --git a/src/Interpreters/Cache/Metadata.cpp b/src/Interpreters/Cache/Metadata.cpp index 9396b8c6f44..02474c966cd 100644 --- a/src/Interpreters/Cache/Metadata.cpp +++ b/src/Interpreters/Cache/Metadata.cpp @@ -353,13 +353,10 @@ void LockedKey::shrinkFileSegmentToDownloadedSize( auto metadata = getByOffset(offset); const auto & file_segment = metadata->file_segment; + chassert(file_segment->assertCorrectnessUnlocked(segment_lock)); const size_t downloaded_size = file_segment->getDownloadedSize(false); - const size_t full_size = file_segment->range().size(); - - chassert(downloaded_size <= file_segment->reserved_size); - - if (downloaded_size == full_size) + if (downloaded_size == file_segment->range().size()) { throw Exception( ErrorCodes::LOGICAL_ERROR, @@ -367,20 +364,17 @@ void LockedKey::shrinkFileSegmentToDownloadedSize( file_segment->getInfoForLogUnlocked(segment_lock)); } - CreateFileSegmentSettings create_settings(file_segment->getKind()); - auto queue_iterator = file_segment->queue_iterator; + ssize_t diff = file_segment->reserved_size - downloaded_size; metadata->file_segment = std::make_shared( - getKey(), offset, downloaded_size, FileSegment::State::DOWNLOADED, create_settings, - file_segment->cache, key_metadata, queue_iterator); + getKey(), offset, downloaded_size, FileSegment::State::DOWNLOADED, + CreateFileSegmentSettings(file_segment->getKind()), + file_segment->cache, key_metadata, file_segment->queue_iterator); - chassert(queue_iterator->getEntry().size == file_segment->reserved_size); - ssize_t diff = file_segment->reserved_size - file_segment->downloaded_size; if (diff) - queue_iterator->updateSize(-diff); + metadata->getQueueIterator()->updateSize(-diff); - chassert(file_segment->reserved_size == downloaded_size); - chassert(metadata->size() == queue_iterator->getEntry().size); + chassert(file_segment->assertCorrectnessUnlocked(segment_lock)); } std::shared_ptr LockedKey::getByOffset(size_t offset) const diff --git a/src/Interpreters/Cache/Metadata.h b/src/Interpreters/Cache/Metadata.h index ee650650488..1348ff31dc7 100644 --- a/src/Interpreters/Cache/Metadata.h +++ b/src/Interpreters/Cache/Metadata.h @@ -15,7 +15,6 @@ using CleanupQueuePtr = std::shared_ptr; struct FileSegmentMetadata : private boost::noncopyable { using Priority = IFileCachePriority; - using PriorityIterator = IFileCachePriority::Iterator; explicit FileSegmentMetadata(FileSegmentPtr && file_segment_); @@ -25,6 +24,8 @@ struct FileSegmentMetadata : private boost::noncopyable bool valid() const { return !removal_candidate.load(); } + Priority::Iterator getQueueIterator() { return file_segment->getQueueIterator(); } + FileSegmentPtr file_segment; std::atomic removal_candidate{false}; };