diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index bc8236f37fb..7d6df156cc0 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -823,10 +823,13 @@ bool FileCache::tryReserve( } EvictionCandidates eviction_candidates; + bool reached_size_limit = false; + bool reached_elements_limit = false; if (query_priority) { - if (!query_priority->collectCandidatesForEviction(size, reserve_stat, eviction_candidates, {}, user.user_id, cache_lock)) + if (!query_priority->collectCandidatesForEviction( + size, reserve_stat, eviction_candidates, {}, user.user_id, reached_size_limit, reached_elements_limit, cache_lock)) return false; LOG_TEST(log, "Query limits satisfied (while reserving for {}:{})", file_segment.key(), file_segment.offset()); @@ -839,38 +842,41 @@ bool FileCache::tryReserve( auto queue_iterator = file_segment.getQueueIterator(); chassert(!queue_iterator || file_segment.getReservedSize() > 0); - if (!main_priority->collectCandidatesForEviction(size, reserve_stat, eviction_candidates, queue_iterator, user.user_id, cache_lock)) + if (!main_priority->collectCandidatesForEviction( + size, reserve_stat, eviction_candidates, queue_iterator, user.user_id, reached_size_limit, reached_elements_limit, cache_lock)) return false; - /// Let's release cache lock if we are going to remove files from filesystem. - const bool release_lock = eviction_candidates.size() > 0; - if (release_lock) - cache_lock.unlock(); - if (!file_segment.getKeyMetadata()->createBaseDirectory()) return false; - try + if (eviction_candidates.size() > 0) { - /// Remove eviction candidates from filesystem. - eviction_candidates.evict(); - } - catch (...) - { - if (release_lock) + chassert(reached_size_limit || reached_elements_limit); + + std::unique_ptr hold_space; + if (!reached_size_limit) + hold_space = std::make_unique(size, 0, queue_iterator, *main_priority, cache_lock); + else if (!reached_elements_limit) + hold_space = std::make_unique(0, 1, queue_iterator, *main_priority, cache_lock); + + cache_lock.unlock(); + try + { + /// Remove eviction candidates from filesystem. + eviction_candidates.evict(); + } + catch (...) + { cache_lock.lock(); + /// Invalidate queue entries if some succeeded to be removed. + eviction_candidates.finalize(query_context.get(), cache_lock); + throw; + } - /// Invalidate queue entries if some succeeded to be removed. - eviction_candidates.finalize(query_context.get(), cache_lock); - throw; - } - - /// Take cache lock again. - if (release_lock) cache_lock.lock(); - - /// Invalidate and remove queue entries and execute (only for SLRU) finalize func. - eviction_candidates.finalize(query_context.get(), cache_lock); + /// Invalidate and remove queue entries and execute (only for SLRU) finalize func. + eviction_candidates.finalize(query_context.get(), cache_lock); + } /// Space reservation is incremental, so file_segment_metadata is created first (with state Empty), /// and queue_iterator is assigned on first space reservation attempt @@ -887,9 +893,6 @@ bool FileCache::tryReserve( file_segment.setQueueIterator(queue_iterator); } - file_segment.reserved_size += size; - chassert(file_segment.reserved_size == queue_iterator->getEntry()->size); - if (query_context) { auto query_queue_it = query_context->tryGet(file_segment.key(), file_segment.offset(), cache_lock); @@ -899,6 +902,9 @@ bool FileCache::tryReserve( query_context->add(file_segment.getKeyMetadata(), file_segment.offset(), size, user, cache_lock); } + file_segment.reserved_size += size; + chassert(file_segment.reserved_size == queue_iterator->getEntry()->size); + if (main_priority->getSize(cache_lock) > (1ull << 63)) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache became inconsistent. There must be a bug"); @@ -1161,7 +1167,7 @@ void FileCache::loadMetadataForKeys(const fs::path & keys_dir) auto lock = lockCache(); size_limit = main_priority->getSizeLimit(lock); - limits_satisfied = main_priority->canFit(size, lock, nullptr, true); + limits_satisfied = main_priority->canFit(size, 1, lock, nullptr, true); if (limits_satisfied) cache_it = main_priority->add(key_metadata, offset, size, user, lock, /* best_effort */true); diff --git a/src/Interpreters/Cache/FileCache.h b/src/Interpreters/Cache/FileCache.h index 8ea5f4dab40..f1384916cd0 100644 --- a/src/Interpreters/Cache/FileCache.h +++ b/src/Interpreters/Cache/FileCache.h @@ -46,6 +46,9 @@ struct FileCacheReserveStat } }; + bool reached_elements_limit = false; + bool reached_size_limit = false; + Stat stat; std::unordered_map stat_by_kind; diff --git a/src/Interpreters/Cache/IFileCachePriority.h b/src/Interpreters/Cache/IFileCachePriority.h index 651710656af..e1fb96c9a7d 100644 --- a/src/Interpreters/Cache/IFileCachePriority.h +++ b/src/Interpreters/Cache/IFileCachePriority.h @@ -102,6 +102,7 @@ public: /// for the corresponding file segment. virtual bool canFit( /// NOLINT size_t size, + size_t elements, const CachePriorityGuard::Lock &, IteratorPtr reservee = nullptr, bool best_effort = false) const = 0; @@ -120,15 +121,40 @@ public: size_t size, FileCacheReserveStat & stat, EvictionCandidates & res, - IFileCachePriority::IteratorPtr reservee, + IteratorPtr reservee, const UserID & user_id, + bool & reached_size_limit, + bool & reached_elements_limit, const CachePriorityGuard::Lock &) = 0; virtual bool modifySizeLimits(size_t max_size_, size_t max_elements_, double size_ratio_, const CachePriorityGuard::Lock &) = 0; + struct HoldSpace : boost::noncopyable + { + HoldSpace(size_t size_, size_t elements_, IteratorPtr reservee_, IFileCachePriority & priority_, const CachePriorityGuard::Lock & lock) + : size(size_), elements(elements_), reservee(reservee_), priority(priority_) + { + priority.holdImpl(size, elements, reservee, lock); + } + + ~HoldSpace() + { + priority.releaseImpl(size, elements, reservee); + } + + size_t size; + size_t elements; + IteratorPtr reservee; + IFileCachePriority & priority; + }; + HoldSpace takeHold(); + protected: IFileCachePriority(size_t max_size_, size_t max_elements_); + virtual void holdImpl(size_t size, size_t elements, IteratorPtr reservee, const CachePriorityGuard::Lock & lock) = 0; + virtual void releaseImpl(size_t size, size_t elements, IteratorPtr) = 0; + size_t max_size = 0; size_t max_elements = 0; }; diff --git a/src/Interpreters/Cache/LRUFileCachePriority.cpp b/src/Interpreters/Cache/LRUFileCachePriority.cpp index 6285dc684cf..e1004cc5245 100644 --- a/src/Interpreters/Cache/LRUFileCachePriority.cpp +++ b/src/Interpreters/Cache/LRUFileCachePriority.cpp @@ -219,21 +219,32 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CachePriorityGuard bool LRUFileCachePriority::canFit( /// NOLINT size_t size, + size_t elements, const CachePriorityGuard::Lock & lock, IteratorPtr, bool) const { - return canFit(size, 0, 0, lock); + return canFit(size, elements, 0, 0, nullptr, nullptr, lock); } bool LRUFileCachePriority::canFit( size_t size, + size_t elements, size_t released_size_assumption, size_t released_elements_assumption, + bool * reached_size_limit, + bool * reached_elements_limit, const CachePriorityGuard::Lock &) const { - return (max_size == 0 || (state->current_size + size - released_size_assumption <= max_size)) - && (max_elements == 0 || state->current_elements_num + 1 - released_elements_assumption <= max_elements); + const bool size_limit_satisifed = max_size == 0 || state->current_size + size - released_size_assumption <= max_size; + const bool elements_limit_satisfied = max_elements == 0 || state->current_elements_num + elements - released_elements_assumption <= max_elements; + + if (reached_size_limit) + *reached_size_limit |= !size_limit_satisifed; + if (reached_elements_limit) + *reached_elements_limit |= !elements_limit_satisfied; + + return size_limit_satisifed && elements_limit_satisfied; } bool LRUFileCachePriority::collectCandidatesForEviction( @@ -242,9 +253,11 @@ bool LRUFileCachePriority::collectCandidatesForEviction( EvictionCandidates & res, IFileCachePriority::IteratorPtr, const UserID &, + bool & reached_size_limit, + bool & reached_elements_limit, const CachePriorityGuard::Lock & lock) { - if (canFit(size, lock)) + if (canFit(size, 1, 0, 0, &reached_size_limit, &reached_elements_limit, lock)) return true; ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionTries); @@ -270,7 +283,7 @@ bool LRUFileCachePriority::collectCandidatesForEviction( auto can_fit = [&] { - return canFit(size, stat.stat.releasable_size, stat.stat.releasable_count, lock); + return canFit(size, 1, stat.stat.releasable_size, stat.stat.releasable_count, nullptr, nullptr, lock); }; iterate([&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata) @@ -450,4 +463,23 @@ void LRUFileCachePriority::shuffle(const CachePriorityGuard::Lock &) queue.splice(queue.end(), queue, it); } +void LRUFileCachePriority::holdImpl(size_t size, size_t elements, IteratorPtr, const CachePriorityGuard::Lock & lock) +{ + if (!canFit(size, elements, lock)) + { + throw Exception(ErrorCodes::LOGICAL_ERROR, + "Cannot take space {}. Current state {}/{} in size, {}/{} in elements", + size, state->current_size, max_size, state->current_elements_num, max_elements); + } + + state->current_size += size; + state->current_elements_num += elements; +} + +void LRUFileCachePriority::releaseImpl(size_t size, size_t elements, IteratorPtr) +{ + state->current_size -= size; + state->current_elements_num -= elements; +} + } diff --git a/src/Interpreters/Cache/LRUFileCachePriority.h b/src/Interpreters/Cache/LRUFileCachePriority.h index d5151b6bbff..d292ff89556 100644 --- a/src/Interpreters/Cache/LRUFileCachePriority.h +++ b/src/Interpreters/Cache/LRUFileCachePriority.h @@ -30,6 +30,7 @@ public: bool canFit( /// NOLINT size_t size, + size_t elements, const CachePriorityGuard::Lock &, IteratorPtr reservee = nullptr, bool best_effort = false) const override; @@ -48,6 +49,8 @@ public: EvictionCandidates & res, IFileCachePriority::IteratorPtr reservee, const UserID & user_id, + bool & reached_size_limit, + bool & reached_elements_limit, const CachePriorityGuard::Lock &) override; void shuffle(const CachePriorityGuard::Lock &) override; @@ -76,7 +79,14 @@ private: void updateElementsCount(int64_t num); void updateSize(int64_t size); - bool canFit(size_t size, size_t released_size_assumption, size_t released_elements_assumption, const CachePriorityGuard::Lock &) const; + bool canFit( + size_t size, + size_t elements, + size_t released_size_assumption, + size_t released_elements_assumption, + bool * reached_size_limit, + bool * reached_elements_limit, + const CachePriorityGuard::Lock &) const; LRUQueue::iterator remove(LRUQueue::iterator it, const CachePriorityGuard::Lock &); @@ -91,6 +101,9 @@ private: LRUIterator move(LRUIterator & it, LRUFileCachePriority & other, const CachePriorityGuard::Lock &); LRUIterator add(EntryPtr entry, const CachePriorityGuard::Lock &); + + void holdImpl(size_t size, size_t elements, IteratorPtr, const CachePriorityGuard::Lock & lock) override; + void releaseImpl(size_t size, size_t elements, IteratorPtr) override; }; class LRUFileCachePriority::LRUIterator : public IFileCachePriority::Iterator diff --git a/src/Interpreters/Cache/SLRUFileCachePriority.cpp b/src/Interpreters/Cache/SLRUFileCachePriority.cpp index 57c4a888025..0bc9826f581 100644 --- a/src/Interpreters/Cache/SLRUFileCachePriority.cpp +++ b/src/Interpreters/Cache/SLRUFileCachePriority.cpp @@ -10,6 +10,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int LOGICAL_ERROR; +} + namespace { size_t getRatio(size_t total, double ratio) @@ -46,23 +51,24 @@ size_t SLRUFileCachePriority::getElementsCount(const CachePriorityGuard::Lock & bool SLRUFileCachePriority::canFit( /// NOLINT size_t size, + size_t elements, const CachePriorityGuard::Lock & lock, IteratorPtr reservee, bool best_effort) const { if (best_effort) - return probationary_queue.canFit(size, lock) || protected_queue.canFit(size, lock); + return probationary_queue.canFit(size, elements, lock) || protected_queue.canFit(size, elements, lock); if (reservee) { const auto * slru_iterator = assert_cast(reservee.get()); if (slru_iterator->is_protected) - return protected_queue.canFit(size, lock); + return protected_queue.canFit(size, elements, lock); else - return probationary_queue.canFit(size, lock); + return probationary_queue.canFit(size, elements, lock); } else - return probationary_queue.canFit(size, lock); + return probationary_queue.canFit(size, elements, lock); } IFileCachePriority::IteratorPtr SLRUFileCachePriority::add( /// NOLINT @@ -78,7 +84,7 @@ IFileCachePriority::IteratorPtr SLRUFileCachePriority::add( /// NOLINT /// If it is server startup, we put entries in any queue it will fit in, /// but with preference for probationary queue, /// because we do not know the distribution between queues after server restart. - if (probationary_queue.canFit(size, lock)) + if (probationary_queue.canFit(size, 1, lock)) { auto lru_iterator = probationary_queue.add(std::make_shared(key_metadata->key, offset, size, key_metadata), lock); return std::make_shared(this, std::move(lru_iterator), false); @@ -102,13 +108,16 @@ bool SLRUFileCachePriority::collectCandidatesForEviction( EvictionCandidates & res, IFileCachePriority::IteratorPtr reservee, const UserID & user_id, + bool & reached_size_limit, + bool & reached_elements_limit, const CachePriorityGuard::Lock & lock) { /// If `it` is nullptr, then it is the first space reservation attempt /// for a corresponding file segment, so it will be directly put into probationary queue. if (!reservee) { - return probationary_queue.collectCandidatesForEviction(size, stat, res, reservee, user_id, lock); + return probationary_queue.collectCandidatesForEviction( + size, stat, res, reservee, user_id, reached_size_limit, reached_elements_limit, lock); } /// If `it` not nullptr (e.g. is already in some queue), @@ -116,13 +125,14 @@ bool SLRUFileCachePriority::collectCandidatesForEviction( /// (in order to know where we need to free space). if (!assert_cast(reservee.get())->is_protected) { - return probationary_queue.collectCandidatesForEviction(size, stat, res, reservee, user_id, lock); + return probationary_queue.collectCandidatesForEviction( + size, stat, res, reservee, user_id, reached_size_limit, reached_elements_limit, lock); } /// Entry is in protected queue. /// Check if we have enough space in protected queue to fit a new size of entry. /// `size` is the increment to the current entry.size we want to increase. - if (protected_queue.canFit(size, lock)) + if (protected_queue.canFit(size, 1, lock)) return true; /// If not enough space - we need to "downgrade" lowest priority entries from protected @@ -132,13 +142,17 @@ bool SLRUFileCachePriority::collectCandidatesForEviction( auto downgrade_candidates = std::make_shared(); FileCacheReserveStat downgrade_stat; - if (!protected_queue.collectCandidatesForEviction(size, downgrade_stat, *downgrade_candidates, reservee, user_id, lock)) + if (!protected_queue.collectCandidatesForEviction( + size, downgrade_stat, *downgrade_candidates, reservee, user_id, reached_size_limit, reached_elements_limit, lock)) return false; const size_t size_to_downgrade = downgrade_stat.stat.releasable_size; - if (!probationary_queue.canFit(size_to_downgrade, lock) - && !probationary_queue.collectCandidatesForEviction(size_to_downgrade, stat, res, reservee, user_id, lock)) + bool reached_size_limit_noop; + bool reached_elements_limit_noop; + if (!probationary_queue.canFit(size_to_downgrade, 1, lock) + && !probationary_queue.collectCandidatesForEviction( + size_to_downgrade, stat, res, reservee, user_id, reached_size_limit_noop, reached_elements_limit_noop, lock)) return false; res.setFinalizeEvictionFunc([=, this](const CachePriorityGuard::Lock & lk) mutable @@ -185,7 +199,11 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach EvictionCandidates downgrade_candidates; FileCacheReserveStat downgrade_stat; - if (!protected_queue.collectCandidatesForEviction(size, downgrade_stat, downgrade_candidates, {}, "", lock)) + bool reached_size_limit_noop; + bool reached_elements_limit_noop; + + if (!protected_queue.collectCandidatesForEviction( + size, downgrade_stat, downgrade_candidates, {}, "", reached_size_limit_noop, reached_elements_limit_noop, lock)) { /// We cannot make space for entry to be moved to protected queue /// (not enough releasable file segments). @@ -208,7 +226,8 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach if (size_to_free) { - if (!probationary_queue.collectCandidatesForEviction(size_to_free, stat, eviction_candidates, {}, {}, lock)) + if (!probationary_queue.collectCandidatesForEviction( + size_to_free, stat, eviction_candidates, {}, {}, reached_size_limit_noop, reached_elements_limit_noop, lock)) { /// "downgrade" candidates cannot be moved to probationary queue, /// so entry cannot be moved to protected queue as well. @@ -321,4 +340,34 @@ void SLRUFileCachePriority::SLRUIterator::assertValid() const lru_iterator.assertValid(); } +void SLRUFileCachePriority::holdImpl(size_t size, size_t elements, IteratorPtr reservee, const CachePriorityGuard::Lock & lock) +{ + if (!canFit(size, elements, lock, reservee)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot hold space {}", size); + + if (reservee) + { + const auto * slru_iterator = assert_cast(reservee.get()); + if (slru_iterator->is_protected) + return protected_queue.holdImpl(size, elements, reservee, lock); + else + return probationary_queue.holdImpl(size, elements, reservee, lock); + } + else + return probationary_queue.holdImpl(size, elements, reservee, lock); +} + +void SLRUFileCachePriority::releaseImpl(size_t size, size_t elements, IteratorPtr reservee) +{ + if (reservee) + { + const auto * slru_iterator = assert_cast(reservee.get()); + if (slru_iterator->is_protected) + return protected_queue.releaseImpl(size, elements, reservee); + else + return probationary_queue.releaseImpl(size, elements, reservee); + } + else + return probationary_queue.releaseImpl(size, elements, reservee); +} } diff --git a/src/Interpreters/Cache/SLRUFileCachePriority.h b/src/Interpreters/Cache/SLRUFileCachePriority.h index 7b8ae77cc8f..286a2e7b9fd 100644 --- a/src/Interpreters/Cache/SLRUFileCachePriority.h +++ b/src/Interpreters/Cache/SLRUFileCachePriority.h @@ -27,6 +27,7 @@ public: bool canFit( /// NOLINT size_t size, + size_t elements, const CachePriorityGuard::Lock &, IteratorPtr reservee = nullptr, bool best_effort = false) const override; @@ -45,6 +46,8 @@ public: EvictionCandidates & res, IFileCachePriority::IteratorPtr reservee, const UserID & user_id, + bool & reached_size_limit, + bool & reached_elements_limit, const CachePriorityGuard::Lock &) override; void shuffle(const CachePriorityGuard::Lock &) override; @@ -60,6 +63,9 @@ private: LoggerPtr log = getLogger("SLRUFileCachePriority"); void increasePriority(SLRUIterator & iterator, const CachePriorityGuard::Lock & lock); + + void holdImpl(size_t size, size_t elements, IteratorPtr reservee, const CachePriorityGuard::Lock & lock) override; + void releaseImpl(size_t size, size_t elements, IteratorPtr reservee) override; }; class SLRUFileCachePriority::SLRUIterator : public IFileCachePriority::Iterator