diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index 76145afe35b..82128e681e4 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -513,8 +513,6 @@ KeyMetadata::iterator FileCache::addFileSegment( "Failed to insert {}:{}: entry already exists", key, offset); } - if (state == FileSegment::State::DOWNLOADED) - chassert(file_segment_metadata_it->second->file_segment->getQueueIterator()); return file_segment_metadata_it; } catch (...) @@ -565,37 +563,30 @@ bool FileCache::tryReserve(FileSegment & file_segment, size_t size) return PriorityIterationResult::REMOVE_AND_CONTINUE; chassert(file_segment_metadata->file_segment->getQueueIterator()); - chassert(entry.offset == file_segment_metadata->file_segment->offset()); - - auto iteration_result = PriorityIterationResult::CONTINUE; - const bool is_persistent = allow_persistent_files && file_segment_metadata->file_segment->isPersistent(); const bool releasable = file_segment_metadata->releasable() && !is_persistent; + if (releasable) { - auto current_file_segment = file_segment_metadata->file_segment; - const size_t file_segment_size = entry.size; + removed_size += entry.size; + --queue_size; - if (current_file_segment->state() == FileSegment::State::DOWNLOADED) + auto segment = file_segment_metadata->file_segment; + if (segment->state() == FileSegment::State::DOWNLOADED) { - const auto & key = current_file_segment->key(); + const auto & key = segment->key(); auto it = to_delete.find(key); if (it == to_delete.end()) it = to_delete.emplace(key, locked_key.getKeyMetadata()).first; it->second.add(file_segment_metadata); - } - else - { - /// TODO: we can resize if partially downloaded instead. - iteration_result = PriorityIterationResult::REMOVE_AND_CONTINUE; - locked_key.removeFileSegment(current_file_segment->offset(), current_file_segment->lock()); + return PriorityIterationResult::CONTINUE; } - removed_size += file_segment_size; - --queue_size; + /// TODO: we can resize if partially downloaded instead. + locked_key.removeFileSegment(segment->offset(), segment->lock()); + return PriorityIterationResult::REMOVE_AND_CONTINUE; } - - return iteration_result; + return PriorityIterationResult::CONTINUE; }; if (query_priority) @@ -676,12 +667,7 @@ bool FileCache::tryReserve(FileSegment & file_segment, size_t size) if (main_priority->getSize(cache_lock) > (1ull << 63)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug"); - const auto & key_metadata = file_segment.getKeyMetadata(); - if (!key_metadata->created_base_directory.exchange(true)) - { - fs::create_directories(metadata.getPathInLocalCache(file_segment.key())); - } - + file_segment.getKeyMetadata()->createBaseDirectory(); return true; } @@ -994,7 +980,16 @@ void FileCache::assertCacheCorrectness() { for (const auto & [offset, file_segment_metadata] : locked_key) { - locked_key.assertFileSegmentCorrectness(*file_segment_metadata->file_segment); + const auto & file_segment = *file_segment_metadata->file_segment; + + if (file_segment.key() != locked_key.getKey()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Expected {} = {}", file_segment.key(), locked_key.getKey()); + } + + file_segment.assertCorrectness(); } }); } diff --git a/src/Interpreters/Cache/FileSegment.cpp b/src/Interpreters/Cache/FileSegment.cpp index cd6fed39c32..34de2c71bde 100644 --- a/src/Interpreters/Cache/FileSegment.cpp +++ b/src/Interpreters/Cache/FileSegment.cpp @@ -138,11 +138,6 @@ size_t FileSegment::getDownloadedSize(bool sync) const void FileSegment::setDownloadedSize(size_t delta) { auto lock = segment_guard.lock(); - setDownloadedSizeUnlocked(delta, lock); -} - -void FileSegment::setDownloadedSizeUnlocked(size_t delta, const FileSegmentGuard::Lock &) -{ downloaded_size += delta; assert(downloaded_size == std::filesystem::file_size(getPathInLocalCache())); } @@ -196,7 +191,7 @@ String FileSegment::getOrSetDownloader() return current_downloader; } -void FileSegment::resetDownloadingStateUnlocked([[maybe_unused]] const FileSegmentGuard::Lock & lock) +void FileSegment::resetDownloadingStateUnlocked(const FileSegmentGuard::Lock & lock) { assert(isDownloaderUnlocked(lock)); assert(download_state == State::DOWNLOADING); @@ -213,6 +208,8 @@ void FileSegment::resetDownloader() { auto lock = segment_guard.lock(); + SCOPE_EXIT({ cv.notify_all(); }); + assertNotDetachedUnlocked(lock); assertIsDownloaderUnlocked("resetDownloader", lock); @@ -224,7 +221,6 @@ void FileSegment::resetDownloaderUnlocked(const FileSegmentGuard::Lock &) { LOG_TEST(log, "Resetting downloader from {}", downloader_id); downloader_id.clear(); - cv.notify_all(); } void FileSegment::assertIsDownloaderUnlocked(const std::string & operation, const FileSegmentGuard::Lock & lock) const @@ -292,17 +288,6 @@ void FileSegment::setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_) remote_file_reader = remote_file_reader_; } -void FileSegment::resetRemoteFileReader() -{ - auto lock = segment_guard.lock(); - assertIsDownloaderUnlocked("resetRemoteFileReader", lock); - - if (!remote_file_reader) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Remote file reader does not exist"); - - remote_file_reader.reset(); -} - void FileSegment::write(const char * from, size_t size, size_t offset) { if (!size) @@ -366,7 +351,7 @@ void FileSegment::write(const char * from, size_t size, size_t offset) { auto lock = segment_guard.lock(); - wrapWithCacheInfo(e, "while writing into cache", lock); + e.addMessage(fmt::format("{}, current cache state: {}", e.what(), getInfoForLogUnlocked(lock))); setDownloadFailedUnlocked(lock); @@ -382,7 +367,7 @@ FileSegment::State FileSegment::wait(size_t offset) { auto lock = segment_guard.lock(); - if (downloader_id.empty()) + if (downloader_id.empty() || offset < getCurrentWriteOffset(true)) return download_state; if (download_state == State::EMPTY) @@ -395,7 +380,7 @@ FileSegment::State FileSegment::wait(size_t offset) chassert(!getDownloaderUnlocked(lock).empty()); chassert(!isDownloaderUnlocked(lock)); - [[maybe_unused]] const bool ok = cv.wait_for(lock, std::chrono::seconds(60), [&, this]() + cv.wait_for(lock, std::chrono::seconds(60), [&, this]() { return download_state != State::DOWNLOADING || offset < getCurrentWriteOffset(true); }); @@ -445,8 +430,6 @@ bool FileSegment::reserve(size_t size_to_reserve) { auto lock = segment_guard.lock(); - LOG_TRACE(log, "Try reserve for {}", getInfoForLogUnlocked(lock)); - assertNotDetachedUnlocked(lock); assertIsDownloaderUnlocked("reserve", lock); @@ -497,7 +480,7 @@ bool FileSegment::reserve(size_t size_to_reserve) return reserved; } -void FileSegment::setDownloadedUnlocked([[maybe_unused]] const FileSegmentGuard::Lock & lock) +void FileSegment::setDownloadedUnlocked(const FileSegmentGuard::Lock &) { if (download_state == State::DOWNLOADED) return; @@ -517,7 +500,9 @@ void FileSegment::setDownloadedUnlocked([[maybe_unused]] const FileSegmentGuard: void FileSegment::setDownloadFailedUnlocked(const FileSegmentGuard::Lock & lock) { - LOG_INFO(log, "Settings download as failed: {}", getInfoForLogUnlocked(lock)); + LOG_INFO(log, "Setting download as failed: {}", getInfoForLogUnlocked(lock)); + + SCOPE_EXIT({ cv.notify_all(); }); setDownloadState(State::PARTIALLY_DOWNLOADED_NO_CONTINUATION, lock); @@ -532,11 +517,9 @@ void FileSegment::setDownloadFailedUnlocked(const FileSegmentGuard::Lock & lock) void FileSegment::completePartAndResetDownloader() { auto lock = segment_guard.lock(); - completePartAndResetDownloaderUnlocked(lock); -} -void FileSegment::completePartAndResetDownloaderUnlocked(const FileSegmentGuard::Lock & lock) -{ + SCOPE_EXIT({ cv.notify_all(); }); + assertNotDetachedUnlocked(lock); assertIsDownloaderUnlocked("completePartAndResetDownloader", lock); @@ -550,6 +533,8 @@ void FileSegment::setBroken() { auto lock = segment_guard.lock(); + SCOPE_EXIT({ cv.notify_all(); }); + assertNotDetachedUnlocked(lock); assertIsDownloaderUnlocked("setBroken", lock); @@ -566,35 +551,27 @@ void FileSegment::complete() return; auto locked_key = lockKeyMetadata(false); - if (locked_key) + if (!locked_key) { - completeUnlocked(*locked_key); - return; + /// If we failed to lock a key, it must be in detached state. + if (isDetached()) + return; + + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot complete file segment: {}", getInfoForLog()); } - /// If we failed to lock a key, it must be in detached state. - if (isDetached()) - return; - - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot complete file segment: {}", getInfoForLog()); -} - -void FileSegment::completeUnlocked(LockedKey & locked_key) -{ auto segment_lock = segment_guard.lock(); if (isCompleted(false)) return; const bool is_downloader = isDownloaderUnlocked(segment_lock); - const bool is_last_holder = locked_key.isLastOwnerOfFileSegment(offset()); + const bool is_last_holder = locked_key->isLastOwnerOfFileSegment(offset()); const size_t current_downloaded_size = getDownloadedSize(true); SCOPE_EXIT({ if (is_downloader) - { cv.notify_all(); - } }); LOG_TEST( @@ -618,9 +595,9 @@ void FileSegment::completeUnlocked(LockedKey & locked_key) if (segment_kind == FileSegmentKind::Temporary && is_last_holder) { LOG_TEST(log, "Removing temporary file segment: {}", getInfoForLogUnlocked(segment_lock)); - detach(segment_lock, locked_key); + detach(segment_lock, *locked_key); setDownloadState(State::DETACHED, segment_lock); - locked_key.removeFileSegment(offset(), segment_lock); + locked_key->removeFileSegment(offset(), segment_lock); return; } @@ -644,12 +621,10 @@ void FileSegment::completeUnlocked(LockedKey & locked_key) { if (is_last_holder) { - setDownloadState(State::DETACHED, segment_lock); - if (current_downloaded_size == 0) { LOG_TEST(log, "Remove file segment {} (nothing downloaded)", range().toString()); - locked_key.removeFileSegment(offset(), segment_lock); + locked_key->removeFileSegment(offset(), segment_lock); } else { @@ -666,13 +641,13 @@ void FileSegment::completeUnlocked(LockedKey & locked_key) /// but current file segment should remain PARRTIALLY_DOWNLOADED_NO_CONTINUATION and with detached state, /// because otherwise an invariant that getOrSet() returns a contiguous range of file segments will be broken /// (this will be crucial for other file segment holder, not for current one). - locked_key.shrinkFileSegmentToDownloadedSize(offset(), segment_lock); + locked_key->shrinkFileSegmentToDownloadedSize(offset(), segment_lock); /// We mark current file segment with state DETACHED, even though the data is still in cache /// (but a separate file segment) because is_last_holder is satisfied, so it does not matter. } - detachAssumeStateFinalized(segment_lock); + setDetachedState(segment_lock); } break; } @@ -707,11 +682,6 @@ String FileSegment::getInfoForLogUnlocked(const FileSegmentGuard::Lock &) const return info.str(); } -void FileSegment::wrapWithCacheInfo(Exception & e, const String & message, const FileSegmentGuard::Lock & lock) const -{ - e.addMessage(fmt::format("{}, current cache state: {}", message, getInfoForLogUnlocked(lock))); -} - String FileSegment::stateToString(FileSegment::State state) { switch (state) @@ -755,15 +725,6 @@ bool FileSegment::assertCorrectness() const return true; } -void FileSegment::throwIfDetachedUnlocked(const FileSegmentGuard::Lock & lock) const -{ - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Cache file segment is in detached state, operation not allowed. " - "It can happen when cache was concurrently dropped with SYSTEM DROP FILESYSTEM CACHE FORCE. " - "Please, retry. File segment info: {}", getInfoForLogUnlocked(lock)); -} - void FileSegment::assertNotDetached() const { auto lock = segment_guard.lock(); @@ -773,7 +734,13 @@ void FileSegment::assertNotDetached() const void FileSegment::assertNotDetachedUnlocked(const FileSegmentGuard::Lock & lock) const { if (download_state == State::DETACHED) - throwIfDetachedUnlocked(lock); + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cache file segment is in detached state, operation not allowed. " + "It can happen when cache was concurrently dropped with SYSTEM DROP FILESYSTEM CACHE FORCE. " + "Please, retry. File segment info: {}", getInfoForLogUnlocked(lock)); + } } FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment) @@ -785,14 +752,12 @@ FileSegmentPtr FileSegment::getSnapshot(const FileSegmentPtr & file_segment) file_segment->offset(), file_segment->range().size(), State::DETACHED, - CreateFileSegmentSettings(file_segment->getKind())); + CreateFileSegmentSettings(file_segment->getKind(), file_segment->is_unbound)); snapshot->hits_count = file_segment->getHitsCount(); snapshot->downloaded_size = file_segment->getDownloadedSize(false); snapshot->download_state = file_segment->download_state.load(); - snapshot->ref_count = file_segment.use_count(); - snapshot->is_unbound = file_segment->is_unbound; return snapshot; } @@ -822,27 +787,32 @@ bool FileSegment::isCompleted(bool sync) const return is_completed_state(); } +void FileSegment::setDetachedState(const FileSegmentGuard::Lock & lock) +{ + setDownloadState(State::DETACHED, lock); + key_metadata.reset(); + cache = nullptr; +} + void FileSegment::detach(const FileSegmentGuard::Lock & lock, const LockedKey &) { if (download_state == State::DETACHED) return; - setDownloadState(State::DETACHED, lock); resetDownloaderUnlocked(lock); - - detachAssumeStateFinalized(lock); -} - -void FileSegment::detachAssumeStateFinalized(const FileSegmentGuard::Lock & lock) -{ - key_metadata.reset(); - LOG_TEST(log, "Detached file segment: {}", getInfoForLogUnlocked(lock)); + setDetachedState(lock); } void FileSegment::use() { + if (!cache) + { + chassert(isCompleted(true)); + return; + } + auto it = getQueueIterator(); - if (it && cache) + if (it) { auto cache_lock = cache->lockCache(); it->use(cache_lock); @@ -869,7 +839,7 @@ String FileSegmentsHolder::toString() if (!ranges.empty()) ranges += ", "; ranges += file_segment->range().toString(); - if (file_segment->is_unbound) + if (file_segment->isUnbound()) ranges += "(unbound)"; } return ranges; diff --git a/src/Interpreters/Cache/FileSegment.h b/src/Interpreters/Cache/FileSegment.h index 5fc3b45cc5e..67432a1de01 100644 --- a/src/Interpreters/Cache/FileSegment.h +++ b/src/Interpreters/Cache/FileSegment.h @@ -163,11 +163,10 @@ public: size_t offset() const { return range().left; } FileSegmentKind getKind() const { return segment_kind; } - bool isPersistent() const { return segment_kind == FileSegmentKind::Persistent; } - bool isUnbound() const { return is_unbound; } - using UniqueId = std::pair; - UniqueId getUniqueId() const { return std::pair(key(), offset()); } + bool isPersistent() const { return segment_kind == FileSegmentKind::Persistent; } + + bool isUnbound() const { return is_unbound; } String getPathInLocalCache() const; @@ -198,6 +197,8 @@ public: size_t getDownloadedSize(bool sync) const; + size_t getReservedSize() const; + /// Now detached status can be used in the following cases: /// 1. there is only 1 remaining file segment holder /// && it does not need this segment anymore @@ -218,12 +219,40 @@ public: bool isDetached() const; - /// File segment has a completed state, if this state is final and is not going to be changed. - /// Completed states: DOWNALODED, DETACHED. + /// File segment has a completed state, if this state is final and + /// is not going to be changed. Completed states: DOWNALODED, DETACHED. bool isCompleted(bool sync = false) const; + void use(); + + /** + * ========== Methods used by `cache` ======================== + */ + + FileSegmentGuard::Lock lock() const { return segment_guard.lock(); } + + CachePriorityIterator getQueueIterator() const; + + void setQueueIterator(CachePriorityIterator iterator); + + KeyMetadataPtr tryGetKeyMetadata() const; + + KeyMetadataPtr getKeyMetadata() const; + bool assertCorrectness() const; + /** + * ========== Methods that must do cv.notify() ================== + */ + + void setBroken(); + + void complete(); + + void completePartAndResetDownloader(); + + void resetDownloader(); + /** * ========== Methods for _only_ file segment's `downloader` ================== */ @@ -240,70 +269,33 @@ public: /// Write data into reserved space. void write(const char * from, size_t size, size_t offset); - void setBroken(); - - void complete(); - - /// Complete file segment's part which was last written. - void completePartAndResetDownloader(); - - void resetDownloader(); - RemoteFileReaderPtr getRemoteFileReader(); RemoteFileReaderPtr extractRemoteFileReader(); void setRemoteFileReader(RemoteFileReaderPtr remote_file_reader_); - void resetRemoteFileReader(); - - FileSegmentGuard::Lock lock() const { return segment_guard.lock(); } - void setDownloadedSize(size_t delta); - size_t getReservedSize() const; - - CachePriorityIterator getQueueIterator() const; - - void setQueueIterator(CachePriorityIterator iterator); - - KeyMetadataPtr getKeyMetadata() const; - - void use(); - private: - String getInfoForLogUnlocked(const FileSegmentGuard::Lock &) const; String getDownloaderUnlocked(const FileSegmentGuard::Lock &) const; + bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const; void resetDownloaderUnlocked(const FileSegmentGuard::Lock &); - void resetDownloadingStateUnlocked(const FileSegmentGuard::Lock &); void setDownloadState(State state, const FileSegmentGuard::Lock &); - void setDownloadedSizeUnlocked(size_t delta, const FileSegmentGuard::Lock &); + void resetDownloadingStateUnlocked(const FileSegmentGuard::Lock &); + void setDetachedState(const FileSegmentGuard::Lock &); + + String getInfoForLogUnlocked(const FileSegmentGuard::Lock &) const; void setDownloadedUnlocked(const FileSegmentGuard::Lock &); void setDownloadFailedUnlocked(const FileSegmentGuard::Lock &); - bool isDetached(const FileSegmentGuard::Lock &) const { return download_state == State::DETACHED; } - void detachAssumeStateFinalized(const FileSegmentGuard::Lock &); - [[noreturn]] void throwIfDetachedUnlocked(const FileSegmentGuard::Lock &) const; - void assertNotDetached() const; void assertNotDetachedUnlocked(const FileSegmentGuard::Lock &) const; void assertIsDownloaderUnlocked(const std::string & operation, const FileSegmentGuard::Lock &) const; LockedKeyPtr lockKeyMetadata(bool assert_exists = true) const; - KeyMetadataPtr tryGetKeyMetadata() const; - - /// completeWithoutStateUnlocked() is called from destructor of FileSegmentsHolder. - /// Function might check if the caller of the method - /// is the last alive holder of the segment. Therefore, completion and destruction - /// of the file segment pointer must be done under the same cache mutex. - void completeUnlocked(LockedKey & locked_key); - - void completePartAndResetDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock); - bool isDownloaderUnlocked(const FileSegmentGuard::Lock & segment_lock) const; - - void wrapWithCacheInfo(Exception & e, const String & message, const FileSegmentGuard::Lock & segment_lock) const; Key file_key; Range segment_range; diff --git a/src/Interpreters/Cache/IFileCachePriority.h b/src/Interpreters/Cache/IFileCachePriority.h index fb0ac152148..31a23e137ca 100644 --- a/src/Interpreters/Cache/IFileCachePriority.h +++ b/src/Interpreters/Cache/IFileCachePriority.h @@ -37,9 +37,6 @@ public: const size_t offset; size_t size; size_t hits = 0; - /// In fact, it is guaranteed that the lifetime of key metadata is longer - /// than Entry, but it is made as weak_ptr to avoid cycle in shared pointer - /// references (because entry actually lies in key metadata). const KeyMetadataPtr key_metadata; }; @@ -52,15 +49,17 @@ public: public: virtual ~IIterator() = default; + virtual size_t use(const CacheGuard::Lock &) = 0; + + virtual std::shared_ptr remove(const CacheGuard::Lock &) = 0; + virtual const Entry & getEntry() const = 0; virtual Entry & getEntry() = 0; - virtual size_t use(const CacheGuard::Lock &) = 0; + virtual void annul() = 0; virtual void updateSize(ssize_t size) = 0; - - virtual std::shared_ptr remove(const CacheGuard::Lock &) = 0; }; using Iterator = std::shared_ptr; diff --git a/src/Interpreters/Cache/LRUFileCachePriority.cpp b/src/Interpreters/Cache/LRUFileCachePriority.cpp index 5b37191fba7..131db6714c3 100644 --- a/src/Interpreters/Cache/LRUFileCachePriority.cpp +++ b/src/Interpreters/Cache/LRUFileCachePriority.cpp @@ -95,7 +95,8 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CacheGuard::Lock & for (auto it = queue.begin(); it != queue.end();) { auto locked_key = it->key_metadata->lock(); - if (locked_key->getKeyState() != KeyMetadata::KeyState::ACTIVE) + if (it->size == 0 + || locked_key->getKeyState() != KeyMetadata::KeyState::ACTIVE) { it = remove(it); continue; @@ -127,6 +128,12 @@ LRUFileCachePriority::Iterator LRUFileCachePriority::LRUFileCacheIterator::remov return std::make_shared(cache_priority, cache_priority->remove(queue_iter)); } +void LRUFileCachePriority::LRUFileCacheIterator::annul() +{ + cache_priority->current_size -= queue_iter->size; + queue_iter->size = 0; +} + void LRUFileCachePriority::LRUFileCacheIterator::updateSize(ssize_t size) { cache_priority->current_size += size; @@ -135,6 +142,9 @@ void LRUFileCachePriority::LRUFileCacheIterator::updateSize(ssize_t size) else CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, size); queue_iter->size += size; + + chassert(cache_priority->current_size >= 0); + chassert(queue_iter->size >= 0); } size_t LRUFileCachePriority::LRUFileCacheIterator::use(const CacheGuard::Lock &) diff --git a/src/Interpreters/Cache/LRUFileCachePriority.h b/src/Interpreters/Cache/LRUFileCachePriority.h index af50d00fcb5..20e3a8ffe4b 100644 --- a/src/Interpreters/Cache/LRUFileCachePriority.h +++ b/src/Interpreters/Cache/LRUFileCachePriority.h @@ -56,6 +56,8 @@ public: Iterator remove(const CacheGuard::Lock &) override; + void annul() override; + void updateSize(ssize_t size) override; private: diff --git a/src/Interpreters/Cache/Metadata.cpp b/src/Interpreters/Cache/Metadata.cpp index 926c6de5d95..67a5f61ad6f 100644 --- a/src/Interpreters/Cache/Metadata.cpp +++ b/src/Interpreters/Cache/Metadata.cpp @@ -49,9 +49,11 @@ KeyMetadata::KeyMetadata( bool created_base_directory_) : key(key_) , key_path(key_path_) - , created_base_directory(created_base_directory_) , cleanup_queue(cleanup_queue_) + , created_base_directory(created_base_directory_) { + if (created_base_directory) + chassert(fs::exists(key_path)); } LockedKeyPtr KeyMetadata::lock() @@ -65,14 +67,40 @@ LockedKeyPtr KeyMetadata::lock() "Cannot lock key {} (state: {})", key, magic_enum::enum_name(key_state)); } +void KeyMetadata::createBaseDirectory() +{ + if (!created_base_directory.exchange(true)) + { + fs::create_directories(key_path); + } +} + std::string KeyMetadata::getFileSegmentPath(const FileSegment & file_segment) { return fs::path(key_path) / CacheMetadata::getFileNameForFileSegment(file_segment.offset(), file_segment.getKind()); } + +struct CleanupQueue +{ + friend struct CacheMetadata; +public: + void add(const FileCacheKey & key); + void remove(const FileCacheKey & key); + size_t getSize() const; + +private: + bool tryPop(FileCacheKey & key); + + std::unordered_set keys; + mutable std::mutex mutex; +}; + + CacheMetadata::CacheMetadata(const std::string & path_) : path(path_) + , cleanup_queue(std::make_unique()) , log(&Poco::Logger::get("CacheMetadata")) { } @@ -128,7 +156,7 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata( it = emplace( key, std::make_shared( - key, getPathInLocalCache(key), cleanup_queue, is_initial_load)).first; + key, getPathInLocalCache(key), *cleanup_queue, is_initial_load)).first; } key_metadata = it->second; @@ -198,7 +226,7 @@ void CacheMetadata::doCleanup() /// we perform this delayed removal. FileCacheKey cleanup_key; - while (cleanup_queue.tryPop(cleanup_key)) + while (cleanup_queue->tryPop(cleanup_key)) { auto it = find(cleanup_key); if (it == end()) @@ -259,15 +287,9 @@ void LockedKey::removeFromCleanupQueue() key_metadata->key_state = KeyMetadata::KeyState::ACTIVE; } -bool LockedKey::markAsRemoved() +void LockedKey::markAsRemoved() { - chassert(key_metadata->key_state != KeyMetadata::KeyState::REMOVED); - - if (key_metadata->key_state == KeyMetadata::KeyState::ACTIVE) - return false; - key_metadata->key_state = KeyMetadata::KeyState::REMOVED; - return true; } bool LockedKey::isLastOwnerOfFileSegment(size_t offset) const @@ -301,13 +323,13 @@ KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, const FileSegm auto file_segment = it->second->file_segment; if (file_segment->queue_iterator) - file_segment->queue_iterator->updateSize(-file_segment->queue_iterator->getEntry().size); + file_segment->queue_iterator->annul(); - const auto path = key_metadata->getFileSegmentPath(*it->second->file_segment); + const auto path = key_metadata->getFileSegmentPath(*file_segment); if (fs::exists(path)) fs::remove(path); - it->second->file_segment->detach(segment_lock, *this); + file_segment->detach(segment_lock, *this); return key_metadata->erase(it); } @@ -326,6 +348,8 @@ void LockedKey::shrinkFileSegmentToDownloadedSize( const size_t downloaded_size = file_segment->getDownloadedSize(false); const size_t full_size = file_segment->range().size(); + chassert(downloaded_size <= file_segment->reserved_size); + if (downloaded_size == full_size) { throw Exception( @@ -334,16 +358,14 @@ void LockedKey::shrinkFileSegmentToDownloadedSize( file_segment->getInfoForLogUnlocked(segment_lock)); } - auto queue_iterator = metadata->file_segment->queue_iterator; - - chassert(downloaded_size <= file_segment->reserved_size); - chassert(queue_iterator->getEntry().size == file_segment->reserved_size); - CreateFileSegmentSettings create_settings(file_segment->getKind()); + auto queue_iterator = file_segment->queue_iterator; + metadata->file_segment = std::make_shared( getKey(), offset, downloaded_size, FileSegment::State::DOWNLOADED, create_settings, - file_segment->cache, key_metadata, file_segment->queue_iterator); + file_segment->cache, key_metadata, queue_iterator); + chassert(queue_iterator->getEntry().size == file_segment->reserved_size); ssize_t diff = file_segment->reserved_size - file_segment->downloaded_size; if (diff) queue_iterator->updateSize(-diff); @@ -352,18 +374,6 @@ void LockedKey::shrinkFileSegmentToDownloadedSize( chassert(metadata->size() == queue_iterator->getEntry().size); } -void LockedKey::assertFileSegmentCorrectness(const FileSegment & file_segment) const -{ - if (file_segment.key() != getKey()) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Expected {} = {}", file_segment.key(), getKey()); - } - - file_segment.assertCorrectness(); -} - std::shared_ptr LockedKey::getByOffset(size_t offset) const { auto it = key_metadata->find(offset); diff --git a/src/Interpreters/Cache/Metadata.h b/src/Interpreters/Cache/Metadata.h index 71a905a259d..54dde8c9aee 100644 --- a/src/Interpreters/Cache/Metadata.h +++ b/src/Interpreters/Cache/Metadata.h @@ -12,6 +12,7 @@ using FileSegmentPtr = std::shared_ptr; struct LockedKey; using LockedKeyPtr = std::shared_ptr; struct CleanupQueue; +using CleanupQueuePtr = std::shared_ptr; struct FileSegmentMetadata : private boost::noncopyable @@ -56,37 +57,23 @@ struct KeyMetadata : public std::map, const Key key; const std::string key_path; - std::atomic created_base_directory = false; LockedKeyPtr lock(); + void createBaseDirectory(); + std::string getFileSegmentPath(const FileSegment & file_segment); private: KeyState key_state = KeyState::ACTIVE; KeyGuard guard; CleanupQueue & cleanup_queue; + std::atomic created_base_directory = false; }; using KeyMetadataPtr = std::shared_ptr; -struct CleanupQueue -{ - friend struct CacheMetadata; -public: - void add(const FileCacheKey & key); - void remove(const FileCacheKey & key); - size_t getSize() const; - -private: - bool tryPop(FileCacheKey & key); - - std::unordered_set keys; - mutable std::mutex mutex; -}; - - struct CacheMetadata : public std::unordered_map, private boost::noncopyable { public: @@ -124,7 +111,7 @@ public: private: const std::string path; /// Cache base path CacheMetadataGuard guard; - CleanupQueue cleanup_queue; + const CleanupQueuePtr cleanup_queue; Poco::Logger * log; }; @@ -162,26 +149,20 @@ struct LockedKey : private boost::noncopyable KeyMetadata::KeyState getKeyState() const { return key_metadata->key_state; } - KeyMetadataPtr getKeyMetadata() const { return key_metadata; } - KeyMetadataPtr getKeyMetadata() { return key_metadata; } - - KeyMetadata::iterator removeFileSegment(size_t offset, const FileSegmentGuard::Lock &); + std::shared_ptr getKeyMetadata() const { return key_metadata; } + std::shared_ptr getKeyMetadata() { return key_metadata; } void removeAllReleasable(); + KeyMetadata::iterator removeFileSegment(size_t offset, const FileSegmentGuard::Lock &); + void shrinkFileSegmentToDownloadedSize(size_t offset, const FileSegmentGuard::Lock &); bool isLastOwnerOfFileSegment(size_t offset) const; - void assertFileSegmentCorrectness(const FileSegment & file_segment) const; - - bool isRemovalCandidate() const; - - bool markAsRemovalCandidate(size_t offset); - void removeFromCleanupQueue(); - bool markAsRemoved(); + void markAsRemoved(); std::string toString() const;