This commit is contained in:
kssenii 2024-03-22 16:13:06 +01:00
parent 87b1e0ead2
commit 272ae1d6ea
10 changed files with 98 additions and 199 deletions

View File

@ -88,6 +88,9 @@ void EvictionCandidates::evict()
void EvictionCandidates::finalize(FileCacheQueryLimit::QueryContext * query_context, const CachePriorityGuard::Lock & lock)
{
for (auto & holder : hold_space)
holder->release();
chassert(lock.owns_lock());
while (!queue_entries_to_invalidate.empty())
{
@ -110,4 +113,14 @@ void EvictionCandidates::finalize(FileCacheQueryLimit::QueryContext * query_cont
finalize_eviction_func(lock);
}
void EvictionCandidates::setSpaceHolder(
size_t size,
size_t elements,
IFileCachePriority & priority,
const CachePriorityGuard::Lock & lock)
{
auto holder = std::make_unique<IFileCachePriority::HoldSpace>(size, elements, priority, lock);
hold_space.emplace_back(std::move(holder));
}
}

View File

@ -21,6 +21,12 @@ public:
auto end() const { return candidates.end(); }
void setSpaceHolder(
size_t size,
size_t elements,
IFileCachePriority & priority,
const CachePriorityGuard::Lock &);
using FinalizeEvictionFunc = std::function<void(const CachePriorityGuard::Lock & lk)>;
void setFinalizeEvictionFunc(FinalizeEvictionFunc && func) { finalize_eviction_func = std::move(func); }
@ -35,6 +41,7 @@ private:
size_t candidates_size = 0;
FinalizeEvictionFunc finalize_eviction_func;
std::vector<IFileCachePriority::IteratorPtr> queue_entries_to_invalidate;
std::vector<IFileCachePriority::HoldSpacePtr> hold_space;
};
using EvictionCandidatesPtr = std::unique_ptr<EvictionCandidates>;

View File

@ -64,16 +64,16 @@ void FileCacheReserveStat::update(size_t size, FileSegmentKind kind, bool releas
auto & local_stat = stat_by_kind[kind];
if (releasable)
{
stat.releasable_size += size;
++stat.releasable_count;
total_stat.releasable_size += size;
++total_stat.releasable_count;
local_stat.releasable_size += size;
++local_stat.releasable_count;
}
else
{
stat.non_releasable_size += size;
++stat.non_releasable_count;
total_stat.non_releasable_size += size;
++total_stat.non_releasable_count;
local_stat.non_releasable_size += size;
++local_stat.non_releasable_count;
@ -833,14 +833,10 @@ bool FileCache::tryReserve(
}
EvictionCandidates eviction_candidates;
bool reached_size_limit = false;
bool reached_elements_limit = false;
if (query_priority)
{
if (!query_priority->collectCandidatesForEviction(
size, reserve_stat, eviction_candidates, {}, user.user_id,
reached_size_limit, reached_elements_limit, cache_lock))
size, reserve_stat, eviction_candidates, {}, user.user_id, cache_lock))
{
return false;
}
@ -858,8 +854,7 @@ bool FileCache::tryReserve(
chassert(!queue_iterator || file_segment.getReservedSize() > 0);
if (!main_priority->collectCandidatesForEviction(
size, reserve_stat, eviction_candidates, queue_iterator, user.user_id,
reached_size_limit, reached_elements_limit, cache_lock))
size, reserve_stat, eviction_candidates, queue_iterator, user.user_id, cache_lock))
{
return false;
}
@ -869,35 +864,6 @@ bool FileCache::tryReserve(
if (eviction_candidates.size() > 0)
{
chassert(reached_size_limit || reached_elements_limit);
/// If we did not reach size limit (it means we reached only elements limit here)
/// then we need to make sure that this fact that we fit in cache by size
/// remains true after we release the lock and take it again.
/// For this purpose we create a HoldSpace holder which makes sure that the space is hold in the meantime.
/// We subtract reserve_stat.stat.releasable_size from the hold space,
/// because it is the space that will be released, so we do not need to take it into account.
const size_t hold_size = reached_size_limit
? size > reserve_stat.stat.releasable_size ? size - reserve_stat.stat.releasable_size : 0
: size;
/// If we reached the elements limit - we will evict at least 1 element,
/// then we do not need to hold anything, otherwise (if we reached limit only by size)
/// we will also evict at least one element, so hold elements count is awlays zero here.
std::unique_ptr<IFileCachePriority::HoldSpace> hold_space;
if (hold_size)
{
const auto queue_entry_type = queue_iterator
? queue_iterator->getType()
: main_priority->getDefaultQueueEntryType();
hold_space = std::make_unique<IFileCachePriority::HoldSpace>(
hold_size, /* hold_elements */0, queue_entry_type, *main_priority, cache_lock);
}
LOG_TEST(log, "Eviction candidates: {}, hold space: {}. {}",
eviction_candidates.size(), hold_size, main_priority->getStateInfoForLog(cache_lock));
cache_lock.unlock();
try
{

View File

@ -49,14 +49,14 @@ struct FileCacheReserveStat
bool reached_elements_limit = false;
bool reached_size_limit = false;
Stat stat;
Stat total_stat;
std::unordered_map<FileSegmentKind, Stat> stat_by_kind;
void update(size_t size, FileSegmentKind kind, bool releasable);
FileCacheReserveStat & operator +=(const FileCacheReserveStat & other)
{
stat += other.stat;
total_stat += other.total_stat;
for (const auto & [name, stat_] : other.stat_by_kind)
stat_by_kind[name] += stat_;
return *this;

View File

@ -13,6 +13,7 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
}
IFileCachePriority::IFileCachePriority(size_t max_size_, size_t max_elements_)
@ -51,4 +52,13 @@ void IFileCachePriority::check(const CachePriorityGuard::Lock & lock) const
}
}
void IFileCachePriority::holdImpl(size_t, size_t, const CachePriorityGuard::Lock &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method holdImpl() is not implemented");
}
void IFileCachePriority::releaseImpl(size_t, size_t)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method holdImpl() is not implemented");
}
}

View File

@ -105,8 +105,6 @@ public:
virtual size_t getElementsCountApprox() const = 0;
virtual QueueEntryType getDefaultQueueEntryType() const = 0;
virtual std::string getStateInfoForLog(const CachePriorityGuard::Lock &) const = 0;
virtual void check(const CachePriorityGuard::Lock &) const;
@ -146,8 +144,6 @@ public:
EvictionCandidates & res,
IteratorPtr reservee,
const UserID & user_id,
bool & reached_size_limit,
bool & reached_elements_limit,
const CachePriorityGuard::Lock &) = 0;
virtual bool modifySizeLimits(size_t max_size_, size_t max_elements_, double size_ratio_, const CachePriorityGuard::Lock &) = 0;
@ -157,12 +153,11 @@ public:
HoldSpace(
size_t size_,
size_t elements_,
QueueEntryType queue_entry_type_,
IFileCachePriority & priority_,
const CachePriorityGuard::Lock & lock)
: size(size_), elements(elements_), queue_entry_type(queue_entry_type_), priority(priority_)
: size(size_), elements(elements_), priority(priority_)
{
priority.holdImpl(size, elements, queue_entry_type, lock);
priority.holdImpl(size, elements, lock);
}
void release()
@ -170,7 +165,7 @@ public:
if (released)
return;
released = true;
priority.releaseImpl(size, elements, queue_entry_type);
priority.releaseImpl(size, elements);
}
~HoldSpace()
@ -182,11 +177,10 @@ public:
private:
const size_t size;
const size_t elements;
const QueueEntryType queue_entry_type;
IFileCachePriority & priority;
bool released = false;
};
HoldSpace takeHold();
using HoldSpacePtr = std::unique_ptr<HoldSpace>;
protected:
IFileCachePriority(size_t max_size_, size_t max_elements_);
@ -194,10 +188,9 @@ protected:
virtual void holdImpl(
size_t size,
size_t elements,
QueueEntryType queue_entry_type,
const CachePriorityGuard::Lock & lock) = 0;
const CachePriorityGuard::Lock & lock);
virtual void releaseImpl(size_t size, size_t elements, QueueEntryType queue_entry_type) = 0;
virtual void releaseImpl(size_t size, size_t elements);
size_t max_size = 0;
size_t max_elements = 0;

View File

@ -37,7 +37,7 @@ LRUFileCachePriority::LRUFileCachePriority(
const std::string & description_)
: IFileCachePriority(max_size_, max_elements_)
, description(description_)
, log(getLogger("LRUFileCachePriority{}" + (description.empty() ? "" : "(" + description + ")")))
, log(getLogger("LRUFileCachePriority" + (description.empty() ? "" : "(" + description + ")")))
{
if (state_)
state = state_;
@ -231,7 +231,7 @@ bool LRUFileCachePriority::canFit( /// NOLINT
IteratorPtr,
bool) const
{
return canFit(size, elements, 0, 0, nullptr, nullptr, lock);
return canFit(size, elements, 0, 0, lock);
}
bool LRUFileCachePriority::canFit(
@ -239,33 +239,24 @@ bool LRUFileCachePriority::canFit(
size_t elements,
size_t released_size_assumption,
size_t released_elements_assumption,
bool * reached_size_limit,
bool * reached_elements_limit,
const CachePriorityGuard::Lock &) const
{
const bool size_limit_satisifed = max_size == 0 || state->current_size + size - released_size_assumption <= max_size;
const bool elements_limit_satisfied = max_elements == 0 || state->current_elements_num + elements - released_elements_assumption <= max_elements;
if (reached_size_limit)
*reached_size_limit |= !size_limit_satisifed;
if (reached_elements_limit)
*reached_elements_limit |= !elements_limit_satisfied;
return size_limit_satisifed && elements_limit_satisfied;
return (max_size == 0 || state->current_size + size - released_size_assumption <= max_size)
&& (max_elements == 0 || state->current_elements_num + elements - released_elements_assumption <= max_elements);
}
bool LRUFileCachePriority::collectCandidatesForEviction(
size_t size,
FileCacheReserveStat & stat,
EvictionCandidates & res,
IFileCachePriority::IteratorPtr,
IFileCachePriority::IteratorPtr /* reservee */,
const UserID &,
bool & reached_size_limit,
bool & reached_elements_limit,
const CachePriorityGuard::Lock & lock)
{
if (canFit(size, 1, 0, 0, &reached_size_limit, &reached_elements_limit, lock))
if (canFit(size, 1, 0, 0, lock))
{
return true;
}
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionTries);
@ -290,7 +281,7 @@ bool LRUFileCachePriority::collectCandidatesForEviction(
auto can_fit = [&]
{
return canFit(size, 1, stat.stat.releasable_size, stat.stat.releasable_count, nullptr, nullptr, lock);
return canFit(size, 1, stat.total_stat.releasable_size, stat.total_stat.releasable_count, lock);
};
iterate([&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
@ -298,7 +289,31 @@ bool LRUFileCachePriority::collectCandidatesForEviction(
return can_fit() ? IterationResult::BREAK : iterate_func(locked_key, segment_metadata);
}, lock);
return can_fit();
if (can_fit())
{
/// If we did not reach size limit (it means we reached only elements limit here)
/// then we need to make sure that this fact that we fit in cache by size
/// remains true after we release the lock and take it again.
/// For this purpose we create a HoldSpace holder which makes sure that the space is hold in the meantime.
/// We subtract reserve_stat.stat.releasable_size from the hold space,
/// because it is the space that will be released, so we do not need to take it into account.
const size_t hold_size = size > stat.total_stat.releasable_size
? size - stat.total_stat.releasable_size
: 0;
if (hold_size)
{
/// If we reached the elements limit - we will evict at least 1 element,
/// then we do not need to hold anything, otherwise (if we reached limit only by size)
/// we will also evict at least one element, so hold elements count is awlays zero here.
res.setSpaceHolder(hold_size, 0, *this, lock);
}
return true;
}
else
{
return false;
}
}
LRUFileCachePriority::LRUIterator
@ -487,7 +502,6 @@ std::string LRUFileCachePriority::getStateInfoForLog(const CachePriorityGuard::L
void LRUFileCachePriority::holdImpl(
size_t size,
size_t elements,
QueueEntryType /* queue_entry_type */,
const CachePriorityGuard::Lock & lock)
{
if (!canFit(size, elements, lock))
@ -501,12 +515,16 @@ void LRUFileCachePriority::holdImpl(
state->current_size += size;
state->current_elements_num += elements;
LOG_TEST(log, "Hold {} by size and {} by elements", size, elements);
}
void LRUFileCachePriority::releaseImpl(size_t size, size_t elements, QueueEntryType /* queue_entry_type */)
void LRUFileCachePriority::releaseImpl(size_t size, size_t elements)
{
state->current_size -= size;
state->current_elements_num -= elements;
LOG_TEST(log, "Released {} by size and {} by elements", size, elements);
}
}

View File

@ -36,8 +36,6 @@ public:
size_t getElementsCountApprox() const override { return state->current_elements_num; }
QueueEntryType getDefaultQueueEntryType() const override { return FileCacheQueueEntryType::LRU; }
std::string getStateInfoForLog(const CachePriorityGuard::Lock & lock) const override;
bool canFit( /// NOLINT
@ -61,8 +59,6 @@ public:
EvictionCandidates & res,
IFileCachePriority::IteratorPtr reservee,
const UserID & user_id,
bool & reached_size_limit,
bool & reached_elements_limit,
const CachePriorityGuard::Lock &) override;
void shuffle(const CachePriorityGuard::Lock &) override;
@ -97,8 +93,6 @@ private:
size_t elements,
size_t released_size_assumption,
size_t released_elements_assumption,
bool * reached_size_limit,
bool * reached_elements_limit,
const CachePriorityGuard::Lock &) const;
LRUQueue::iterator remove(LRUQueue::iterator it, const CachePriorityGuard::Lock &);
@ -118,10 +112,9 @@ private:
void holdImpl(
size_t size,
size_t elements,
QueueEntryType queue_entry_type,
const CachePriorityGuard::Lock & lock) override;
void releaseImpl(size_t size, size_t elements, QueueEntryType queue_entry_type) override;
void releaseImpl(size_t size, size_t elements) override;
};
class LRUFileCachePriority::LRUIterator : public IFileCachePriority::Iterator

View File

@ -136,8 +136,6 @@ bool SLRUFileCachePriority::collectCandidatesForEviction(
EvictionCandidates & res,
IFileCachePriority::IteratorPtr reservee,
const UserID & user_id,
bool & reached_size_limit,
bool & reached_elements_limit,
const CachePriorityGuard::Lock & lock)
{
/// If `it` is nullptr, then it is the first space reservation attempt
@ -145,7 +143,7 @@ bool SLRUFileCachePriority::collectCandidatesForEviction(
if (!reservee)
{
return probationary_queue.collectCandidatesForEviction(
size, stat, res, reservee, user_id, reached_size_limit, reached_elements_limit, lock);
size, stat, res, reservee, user_id, lock);
}
/// If `it` not nullptr (e.g. is already in some queue),
@ -154,7 +152,7 @@ bool SLRUFileCachePriority::collectCandidatesForEviction(
if (!assert_cast<SLRUIterator *>(reservee.get())->is_protected)
{
return probationary_queue.collectCandidatesForEviction(
size, stat, res, reservee, user_id, reached_size_limit, reached_elements_limit, lock);
size, stat, res, reservee, user_id, lock);
}
/// Entry is in protected queue.
@ -173,68 +171,28 @@ bool SLRUFileCachePriority::collectCandidatesForEviction(
auto downgrade_candidates = std::make_shared<EvictionCandidates>();
if (!protected_queue.collectCandidatesForEviction(
size, stat, *downgrade_candidates, reservee,
user_id, reached_size_limit, reached_elements_limit, lock))
size, stat, *downgrade_candidates, reservee, user_id, lock))
{
return false;
}
chassert(downgrade_candidates->size() > 0);
const size_t size_to_downgrade = stat.stat.releasable_size;
const size_t elements_to_downgrade = stat.stat.releasable_count;
const size_t size_to_downgrade = stat.total_stat.releasable_size;
chassert(size_to_downgrade);
FileCacheReserveStat probationary_stat;
bool downgrade_reached_size_limit = false;
bool downgrade_reached_elements_limit = false;
if (!probationary_queue.collectCandidatesForEviction(
size_to_downgrade, probationary_stat, res, reservee, user_id,
downgrade_reached_size_limit, downgrade_reached_elements_limit, lock))
size_to_downgrade, probationary_stat, res, reservee, user_id, lock))
{
return false;
}
const size_t size_to_evict_from_probationary = probationary_stat.stat.releasable_size;
const size_t elements_to_evict_from_probationary = probationary_stat.stat.releasable_count;
std::shared_ptr<HoldSpace> hold_space;
const bool downgrade_after_eviction = res.size() > 0;
if (downgrade_after_eviction)
auto downgrade_func = [=, this](const CachePriorityGuard::Lock & lk)
{
const size_t hold_size = downgrade_reached_size_limit
? size_to_downgrade > size_to_evict_from_probationary ? size_to_downgrade - size_to_evict_from_probationary : 0
: size_to_downgrade;
const size_t hold_elements = downgrade_reached_elements_limit
? elements_to_downgrade > elements_to_evict_from_probationary ? elements_to_downgrade - elements_to_evict_from_probationary : 0
: elements_to_downgrade;
hold_space = std::make_shared<HoldSpace>(
hold_size, hold_elements, QueueEntryType::SLRU_Probationary, probationary_queue, lock);
LOG_TEST(log, "Eviction candidates: {}, hold space: {} in size and {} in elements. {}",
res.size(), hold_size, hold_elements, getStateInfoForLog(lock));
}
else
{
LOG_TEST(log, "Eviction candidates: {}, hold space: none. {}",
res.size(), getStateInfoForLog(lock));
}
auto downgrade_func = [=, this]
(const CachePriorityGuard::Lock & lk)
{
if (hold_space)
hold_space->release();
LOG_TEST(log, "Downgrading {} elements from protected to probationary. "
"Total size: {}",
downgrade_candidates->size(), stat.stat.releasable_size);
downgrade_candidates->size(), stat.total_stat.releasable_size);
for (const auto & [key, key_candidates] : *downgrade_candidates)
{
@ -243,7 +201,7 @@ bool SLRUFileCachePriority::collectCandidatesForEviction(
}
};
if (downgrade_after_eviction)
if (res.size() > 0)
{
/// Downgrade from protected to probationary only after
/// we free up space in probationary (in order to fit these downgrade candidates).
@ -309,12 +267,8 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
EvictionCandidates downgrade_candidates;
FileCacheReserveStat downgrade_stat;
bool reached_size_limit_noop;
bool reached_elements_limit_noop;
if (!protected_queue.collectCandidatesForEviction(
entry_size, downgrade_stat, downgrade_candidates, {}, "",
reached_size_limit_noop, reached_elements_limit_noop, lock))
entry_size, downgrade_stat, downgrade_candidates, {}, "", lock))
{
/// We cannot make space for entry to be moved to protected queue
/// (not enough releasable file segments).
@ -323,8 +277,8 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
return;
}
const size_t downgrade_size = downgrade_stat.stat.releasable_size;
const size_t downgrade_count = downgrade_stat.stat.releasable_count;
const size_t downgrade_size = downgrade_stat.total_stat.releasable_size;
const size_t downgrade_count = downgrade_stat.total_stat.releasable_count;
/// Then we need to free up space in probationary for downgrade candidates,
/// but we take into account that we'll remove reservee from probationary
@ -343,8 +297,7 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
if (size_to_free)
{
if (!probationary_queue.collectCandidatesForEviction(
size_to_free, stat, eviction_candidates, {}, {},
reached_size_limit_noop, reached_elements_limit_noop, lock))
size_to_free, stat, eviction_candidates, {}, {}, lock))
{
/// "downgrade" candidates cannot be moved to probationary queue,
/// so entry cannot be moved to protected queue as well.
@ -384,6 +337,8 @@ void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const Cach
downgrade(candidate->getQueueIterator(), lock);
}
downgrade_candidates.finalize(nullptr, lock);
iterator.lru_iterator = protected_queue.add(entry, lock);
iterator.is_protected = true;
}
@ -470,50 +425,6 @@ void SLRUFileCachePriority::SLRUIterator::assertValid() const
lru_iterator.assertValid();
}
void SLRUFileCachePriority::holdImpl(
size_t size,
size_t elements,
QueueEntryType queue_entry_type,
const CachePriorityGuard::Lock & lock)
{
switch (queue_entry_type)
{
case QueueEntryType::SLRU_Protected:
{
protected_queue.holdImpl(size, elements, queue_entry_type, lock);
break;
}
case QueueEntryType::SLRU_Probationary:
{
probationary_queue.holdImpl(size, elements, queue_entry_type, lock);
break;
}
default:
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Unexpected queue entry type: {}", queue_entry_type);
}
}
void SLRUFileCachePriority::releaseImpl(size_t size, size_t elements, QueueEntryType queue_entry_type)
{
switch (queue_entry_type)
{
case QueueEntryType::SLRU_Protected:
{
protected_queue.releaseImpl(size, elements, queue_entry_type);
break;
}
case QueueEntryType::SLRU_Probationary:
{
probationary_queue.releaseImpl(size, elements, queue_entry_type);
break;
}
default:
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Unexpected queue entry type: {}", queue_entry_type);
}
}
std::string SLRUFileCachePriority::getStateInfoForLog(const CachePriorityGuard::Lock & lock) const
{
return fmt::format("total size {}/{}, elements {}/{}, "

View File

@ -29,8 +29,6 @@ public:
size_t getElementsCountApprox() const override;
QueueEntryType getDefaultQueueEntryType() const override { return FileCacheQueueEntryType::SLRU_Probationary; }
std::string getStateInfoForLog(const CachePriorityGuard::Lock & lock) const override;
void check(const CachePriorityGuard::Lock &) const override;
@ -56,8 +54,6 @@ public:
EvictionCandidates & res,
IFileCachePriority::IteratorPtr reservee,
const UserID & user_id,
bool & reached_size_limit,
bool & reached_elements_limit,
const CachePriorityGuard::Lock &) override;
void shuffle(const CachePriorityGuard::Lock &) override;
@ -74,14 +70,6 @@ private:
void increasePriority(SLRUIterator & iterator, const CachePriorityGuard::Lock & lock);
void holdImpl(
size_t size,
size_t elements,
QueueEntryType queue_entry_type,
const CachePriorityGuard::Lock & lock) override;
void releaseImpl(size_t size, size_t elements, QueueEntryType queue_entry_type) override;
void downgrade(IteratorPtr iterator, const CachePriorityGuard::Lock &);
};