diff --git a/src/Interpreters/Cache/EvictionCandidates.cpp b/src/Interpreters/Cache/EvictionCandidates.cpp index e9321c1a5ae..c9970f60288 100644 --- a/src/Interpreters/Cache/EvictionCandidates.cpp +++ b/src/Interpreters/Cache/EvictionCandidates.cpp @@ -18,29 +18,41 @@ namespace ErrorCodes EvictionCandidates::~EvictionCandidates() { + /// Here `queue_entries_to_invalidate` contains queue entries + /// for file segments which were successfully removed in evict(). + /// This set is non-empty in desctructor only if there was + /// an exception before we called finalize() or in the middle of finalize(). for (const auto & iterator : queue_entries_to_invalidate) { - /// If there was an exception between evict and finalize phase - /// for some eviction candidate, we need to reset its entry now. + /// In this case we need to finalize the state of queue entries + /// which correspond to removed files segments to make sure + /// consistent state of cache. iterator->invalidate(); } + /// Here `candidates` contain only those file segments + /// which failed to be removed during evict() + /// because there was some exception before evict() + /// or in the middle of evict(). for (const auto & [key, key_candidates] : candidates) { + // Reset the evicting state + // (as the corresponding file segments were not yet removed). for (const auto & candidate : key_candidates.candidates) candidate->setEvicting(false, nullptr, nullptr); } } -void EvictionCandidates::add(const FileSegmentMetadataPtr & candidate, LockedKey & locked_key, const CachePriorityGuard::Lock & lock) +void EvictionCandidates::add( + const FileSegmentMetadataPtr & candidate, + LockedKey & locked_key, + const CachePriorityGuard::Lock & lock) { - chassert(lock.owns_lock()); - auto [it, inserted] = candidates.emplace(locked_key.getKey(), KeyCandidates{}); if (inserted) it->second.key_metadata = locked_key.getKeyMetadata(); - it->second.candidates.push_back(candidate); + it->second.candidates.push_back(candidate); candidate->setEvicting(true, &locked_key, &lock); ++candidates_size; } @@ -57,7 +69,11 @@ void EvictionCandidates::evict() { auto locked_key = key_candidates.key_metadata->tryLock(); if (!locked_key) - continue; /// key could become invalid after we released the key lock above, just skip it. + { + /// key could become invalid after we released + /// the key lock above, just skip it. + continue; + } while (!key_candidates.candidates.empty()) { @@ -71,18 +87,29 @@ void EvictionCandidates::evict() ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictedFileSegments); ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictedBytes, segment->range().size()); - /// We remove file segment, but do not invalidate queue entry now, - /// we will invalidate it after all eviction candidates are removed (in finalize() method), - /// because invalidation of queue entries needs to be done under cache lock. - /// Why? - /// Firstly, as long as queue entry exists, the corresponding space in cache is considered to be hold, - /// and once it is invalidated - the space is released. - /// Secondly, after evict() and finalize() stages we will also add back "reserved size" - /// (<= actually released size), but until we do this - we cannot allow other threads to think that - /// this released space is free, as it is not - it is removed in favour of some reserver - /// so we can make it visibly free only for that particular reserver. locked_key->removeFileSegment( - segment->offset(), segment->lock(), /* can_be_broken */false, /* invalidate_queue_entry */false); + segment->offset(), segment->lock(), + false/* can_be_broken */, false/* invalidate_queue_entry */); + + /// We set invalidate_queue_entry = false in removeFileSegment() above, because: + /// evict() is done without a cache priority lock while finalize() is done under the lock. + /// In evict() we: + /// - remove file segment from filesystem + /// - remove it from cache metadata + /// In finalize() we: + /// - remove corresponding queue entry from priority queue + /// + /// We do not invalidate queue entry now in evict(), + /// because invalidation of queue entries needs to be done under cache lock. + /// Why? Firstly, as long as queue entry exists, + /// the corresponding space in cache is considered to be hold, + /// and once queue entry is removed/invalidated - the space is released. + /// Secondly, after evict() and finalize() stages we will also add back the + /// "reserved size" (<= actually released size), + /// but until we do this - we cannot allow other threads to think that + /// this released space is free to take, as it is not - + /// it was freed in favour of some reserver, so we can make it visibly + /// free only for that particular reserver. queue_entries_to_invalidate.push_back(iterator); key_candidates.candidates.pop_back(); @@ -90,12 +117,18 @@ void EvictionCandidates::evict() } } -void EvictionCandidates::finalize(FileCacheQueryLimit::QueryContext * query_context, const CachePriorityGuard::Lock & lock) +void EvictionCandidates::finalize( + FileCacheQueryLimit::QueryContext * query_context, + const CachePriorityGuard::Lock & lock) { + chassert(lock.owns_lock()); + + /// Release the hold space. It was hold only for the duration of evict() phase, + /// now we can release. It might also be needed for on_finalize func, + /// so release the space it firtst. if (hold_space) hold_space->release(); - chassert(lock.owns_lock()); while (!queue_entries_to_invalidate.empty()) { auto iterator = queue_entries_to_invalidate.back(); @@ -108,7 +141,6 @@ void EvictionCandidates::finalize(FileCacheQueryLimit::QueryContext * query_cont const auto & entry = iterator->getEntry(); query_context->remove(entry->key, entry->offset, lock); } - /// Remove entry from main priority queue. iterator->remove(lock); } @@ -129,7 +161,8 @@ void EvictionCandidates::setSpaceHolder( { if (hold_space) throw Exception(ErrorCodes::LOGICAL_ERROR, "Space hold is already set"); - hold_space = std::make_unique(size, elements, priority, lock); + else + hold_space = std::make_unique(size, elements, priority, lock); } } diff --git a/src/Interpreters/Cache/EvictionCandidates.h b/src/Interpreters/Cache/EvictionCandidates.h index e27ff0915fe..2745d508a5d 100644 --- a/src/Interpreters/Cache/EvictionCandidates.h +++ b/src/Interpreters/Cache/EvictionCandidates.h @@ -11,13 +11,18 @@ public: ~EvictionCandidates(); - void add(const FileSegmentMetadataPtr & candidate, LockedKey & locked_key, const CachePriorityGuard::Lock &); + void add( + const FileSegmentMetadataPtr & candidate, + LockedKey & locked_key, + const CachePriorityGuard::Lock &); void evict(); void onFinalize(FinalizeEvictionFunc && func) { on_finalize.emplace_back(std::move(func)); } - void finalize(FileCacheQueryLimit::QueryContext * query_context, const CachePriorityGuard::Lock &); + void finalize( + FileCacheQueryLimit::QueryContext * query_context, + const CachePriorityGuard::Lock &); size_t size() const { return candidates_size; } diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index c425a39d637..71dc0cca3a7 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -830,11 +830,26 @@ bool FileCache::tryReserve( file_segment.key(), file_segment.offset()); } + auto queue_iterator = file_segment.getQueueIterator(); + + /// A file_segment_metadata acquires a priority iterator + /// on first successful space reservation attempt, + /// so queue_iterator == nullptr, if no space reservation took place yet. + chassert(!queue_iterator || file_segment.getReservedSize() > 0); + + /// If it is the first space reservatiob attempt for a file segment + /// we need to make space for 1 element in cache, + /// otherwise space is already taken and we need 0 elements to free. + size_t required_elements_num = queue_iterator ? 0 : 1; + EvictionCandidates eviction_candidates; + + /// If user has configured fs cache limit per query, + /// we take into account query limits here. if (query_priority) { if (!query_priority->collectCandidatesForEviction( - size, 1, reserve_stat, eviction_candidates, {}, user.user_id, cache_lock)) + size, required_elements_num, reserve_stat, eviction_candidates, {}, user.user_id, cache_lock)) { return false; } @@ -847,12 +862,8 @@ bool FileCache::tryReserve( reserve_stat.stat_by_kind.clear(); } - /// A file_segment_metadata acquires a priority iterator on first successful space reservation attempt, - auto queue_iterator = file_segment.getQueueIterator(); - chassert(!queue_iterator || file_segment.getReservedSize() > 0); - if (!main_priority->collectCandidatesForEviction( - size, 1, reserve_stat, eviction_candidates, queue_iterator, user.user_id, cache_lock)) + size, required_elements_num, reserve_stat, eviction_candidates, queue_iterator, user.user_id, cache_lock)) { return false; } @@ -878,33 +889,29 @@ bool FileCache::tryReserve( cache_lock.lock(); - /// Invalidate and remove queue entries and execute (only for SLRU) finalize func. + /// Invalidate and remove queue entries and execute finalize func. eviction_candidates.finalize(query_context.get(), cache_lock); } - else if (!main_priority->canFit(size, /* elements */queue_iterator ? 0 : 1, cache_lock, queue_iterator)) + else if (!main_priority->canFit(size, required_elements_num, cache_lock, queue_iterator)) { - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Cannot fit {} in cache, but collection of eviction candidates succeeded. " - "This is a bug. Queue entry type: {}. Cache info: {}", - size, queue_iterator ? queue_iterator->getType() : FileCacheQueueEntryType::None, - main_priority->getStateInfoForLog(cache_lock)); + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Cannot fit {} in cache, but collection of eviction candidates succeeded with no candidates. " + "This is a bug. Queue entry type: {}. Cache info: {}", + size, queue_iterator ? queue_iterator->getType() : FileCacheQueueEntryType::None, + main_priority->getStateInfoForLog(cache_lock)); } - /// Space reservation is incremental, so file_segment_metadata is created first (with state Empty), - /// and queue_iterator is assigned on first space reservation attempt - /// (so it is nullptr here if we are reserving for the first time). if (queue_iterator) { /// Increase size of queue entry. queue_iterator->incrementSize(size, cache_lock); - main_priority->check(cache_lock); } else { /// Create a new queue entry and assign currently reserved size to it. queue_iterator = main_priority->add(file_segment.getKeyMetadata(), file_segment.offset(), size, user, cache_lock); file_segment.setQueueIterator(queue_iterator); - main_priority->check(cache_lock); } main_priority->check(cache_lock); diff --git a/src/Interpreters/Cache/FileCache.h b/src/Interpreters/Cache/FileCache.h index 717bb51cf71..4ee456cce72 100644 --- a/src/Interpreters/Cache/FileCache.h +++ b/src/Interpreters/Cache/FileCache.h @@ -46,9 +46,6 @@ struct FileCacheReserveStat } }; - bool reached_elements_limit = false; - bool reached_size_limit = false; - Stat total_stat; std::unordered_map stat_by_kind; diff --git a/src/Interpreters/Cache/IFileCachePriority.h b/src/Interpreters/Cache/IFileCachePriority.h index 32966687b98..c12f79e3d7a 100644 --- a/src/Interpreters/Cache/IFileCachePriority.h +++ b/src/Interpreters/Cache/IFileCachePriority.h @@ -149,6 +149,9 @@ public: virtual bool modifySizeLimits(size_t max_size_, size_t max_elements_, double size_ratio_, const CachePriorityGuard::Lock &) = 0; + /// A space holder implementation, which allows to take hold of + /// some space in cache given that this space was freed. + /// Takes hold of the space in constructor and releases it in destructor. struct HoldSpace : private boost::noncopyable { HoldSpace( diff --git a/src/Interpreters/Cache/LRUFileCachePriority.cpp b/src/Interpreters/Cache/LRUFileCachePriority.cpp index fabe003aae6..e65c102f1e3 100644 --- a/src/Interpreters/Cache/LRUFileCachePriority.cpp +++ b/src/Interpreters/Cache/LRUFileCachePriority.cpp @@ -171,12 +171,17 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CachePriorityGuard if (entry.size == 0) { + /// entry.size == 0 means that queue entry was invalidated, + /// valid (active) queue entries always have size > 0, + /// so we can safely remove it. it = remove(it, lock); continue; } if (entry.isEvicting(lock)) { + /// Skip queue entries which are in evicting state. + /// We threat them the same way as deleted entries. ++it; ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionSkippedEvictingFileSegments); continue; @@ -185,6 +190,10 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CachePriorityGuard auto locked_key = entry.key_metadata->tryLock(); if (!locked_key || entry.size == 0) { + /// locked_key == nullptr means that the cache key of + /// the file segment of this queue entry no longer exists. + /// This is normal if the key was removed from metadata, + /// while queue entries can be removed lazily (with delay). it = remove(it, lock); continue; } @@ -192,6 +201,9 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CachePriorityGuard auto metadata = locked_key->tryGetByOffset(entry.offset); if (!metadata) { + /// Same as explained in comment above, metadata == nullptr, + /// if file segment was removed from cache metadata, + /// but queue entry still exists because it is lazily removed. it = remove(it, lock); continue; } @@ -294,6 +306,15 @@ bool LRUFileCachePriority::collectCandidatesForEviction( if (can_fit()) { + /// As eviction is done without a cache priority lock, + /// then if some space was partially available and some needed + /// to be freed via eviction, we need to make sure that this + /// partially available space is still available + /// after we finish with eviction for non-available space. + /// So we create a space holder for the currently available part + /// of the required space for the duration of eviction of the other + /// currently non-available part of the space. + const size_t hold_size = size > stat.total_stat.releasable_size ? size - stat.total_stat.releasable_size : 0; @@ -519,7 +540,7 @@ void LRUFileCachePriority::holdImpl( state->current_size += size; state->current_elements_num += elements; - LOG_TEST(log, "Hold {} by size and {} by elements", size, elements); + // LOG_TEST(log, "Hold {} by size and {} by elements", size, elements); } void LRUFileCachePriority::releaseImpl(size_t size, size_t elements) @@ -529,7 +550,7 @@ void LRUFileCachePriority::releaseImpl(size_t size, size_t elements) state->current_size -= size; state->current_elements_num -= elements; - LOG_TEST(log, "Released {} by size and {} by elements", size, elements); + // LOG_TEST(log, "Released {} by size and {} by elements", size, elements); } } diff --git a/src/Interpreters/Cache/SLRUFileCachePriority.cpp b/src/Interpreters/Cache/SLRUFileCachePriority.cpp index 0c5a891a7b5..1400d3219c6 100644 --- a/src/Interpreters/Cache/SLRUFileCachePriority.cpp +++ b/src/Interpreters/Cache/SLRUFileCachePriority.cpp @@ -103,7 +103,7 @@ IFileCachePriority::IteratorPtr SLRUFileCachePriority::add( /// NOLINT /// If it is server startup, we put entries in any queue it will fit in, /// but with preference for probationary queue, /// because we do not know the distribution between queues after server restart. - if (probationary_queue.canFit(size, 1, lock)) + if (probationary_queue.canFit(size, /* elements */1, lock)) { auto lru_iterator = probationary_queue.add(std::make_shared(key_metadata->key, offset, size, key_metadata), lock); iterator = std::make_shared(this, std::move(lru_iterator), false); @@ -122,9 +122,10 @@ IFileCachePriority::IteratorPtr SLRUFileCachePriority::add( /// NOLINT if (getSize(lock) > max_size || getElementsCount(lock) > max_elements) { - throw Exception(ErrorCodes::LOGICAL_ERROR, - "Went beyond limits. Added {} for {} element ({}:{}). Current state: {}", - size, iterator->getType(), key_metadata->key, offset, getStateInfoForLog(lock)); + throw Exception( + ErrorCodes::LOGICAL_ERROR, "Violated cache limits. " + "Added {} for {} element ({}:{}). Current state: {}", + size, iterator->getType(), key_metadata->key, offset, getStateInfoForLog(lock)); } return iterator; @@ -139,7 +140,7 @@ bool SLRUFileCachePriority::collectCandidatesForEviction( const UserID & user_id, const CachePriorityGuard::Lock & lock) { - /// If `it` is nullptr, then it is the first space reservation attempt + /// If `reservee` is nullptr, then it is the first space reservation attempt /// for a corresponding file segment, so it will be directly put into probationary queue. if (!reservee) { @@ -149,7 +150,7 @@ bool SLRUFileCachePriority::collectCandidatesForEviction( auto * slru_iterator = assert_cast(reservee.get()); bool success = false; - /// If `it` not nullptr (e.g. is already in some queue), + /// If `reservee` is not nullptr (e.g. is already in some queue), /// we need to check in which queue (protected/probationary) it currently is /// (in order to know where we need to free space). if (!slru_iterator->is_protected) @@ -163,10 +164,18 @@ bool SLRUFileCachePriority::collectCandidatesForEviction( /// Entry is in protected queue. /// Check if we have enough space in protected queue to fit a new size of entry. /// `size` is the increment to the current entry.size we want to increase. - /// Here `elements` is 0, because entry is already in the protected queue. success = collectCandidatesForEvictionInProtected(size, elements, stat, res, reservee, user_id, lock); } + /// We eviction_candidates (res) set is non-empty and + /// space reservation was successful, we will do eviction from filesystem + /// which is executed without a cache priority lock. + /// So we made space reservation from a certain queue (protected or probationary), + /// but we should remember that there is an increasePriority operation + /// which can be called concurrently to space reservation. + /// This operation can move elements from one queue to another. + /// We need to make sure that this does not happen for the elements + /// which are in the process of unfinished space reservation. if (success && res.size() > 0) { slru_iterator->movable = false; @@ -249,6 +258,12 @@ void SLRUFileCachePriority::downgrade(IteratorPtr iterator, const CachePriorityG "Cannot downgrade {}: it is already in probationary queue", candidate_it->getEntry()->toString()); } + if (!candidate_it->movable) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Cannot downgrade {}: it is non-movable", + candidate_it->getEntry()->toString()); + } const size_t entry_size = candidate_it->entry->size; if (!probationary_queue.canFit(entry_size, 1, lock)) @@ -273,12 +288,6 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach return; } - /// Iterator can me marked as non-movable in case we are concurrently - /// reserving space for it. It means that we start space reservation, - /// prepare space in probationary queue, then do eviction without lock, - /// then take the lock again to finalize the eviction and we need to be sure - /// that the element is still in probationary queue. - /// Therefore we forbid concurrent priority increase for probationary entries. if (!iterator.movable) { iterator.lru_iterator.increasePriority(lock); @@ -313,7 +322,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach /// queue to probationary queue. /// if (!collectCandidatesForEvictionInProtected( - entry->size, 1, stat, eviction_candidates, nullptr, FileCache::getInternalUser().user_id, lock)) + entry->size, /* elements */1, stat, eviction_candidates, nullptr, FileCache::getInternalUser().user_id, lock)) { /// "downgrade" candidates cannot be moved to probationary queue, /// so entry cannot be moved to protected queue as well. diff --git a/src/Interpreters/Cache/SLRUFileCachePriority.h b/src/Interpreters/Cache/SLRUFileCachePriority.h index 89eb1f0abd8..4cf5bb0f199 100644 --- a/src/Interpreters/Cache/SLRUFileCachePriority.h +++ b/src/Interpreters/Cache/SLRUFileCachePriority.h @@ -121,6 +121,15 @@ private: /// but needed only in order to do FileSegment::getInfo() without any lock, /// which is done for system tables and logging. std::atomic is_protected; + /// Iterator can me marked as non-movable in case we are reserving + /// space for it. It means that we start space reservation + /// and prepare space in probationary queue, then do eviction without lock, + /// then take the lock again to finalize the eviction and we need to be sure + /// that the element is still in probationary queue. + /// Therefore we forbid concurrent priority increase for probationary entries. + /// Same goes for the downgrade of queue entries from protected to probationary. + /// (For downgrade there is no explicit check because it will fall into unreleasable state, + /// e.g. will not be taken for eviction anyway). bool movable{true}; };