ClickHouse/src/Common/LRUCache.h

433 lines
12 KiB
C++
Raw Normal View History

#pragma once
#include <unordered_map>
#include <list>
#include <memory>
2015-05-07 12:28:09 +00:00
#include <chrono>
#include <mutex>
#include <atomic>
2021-10-02 07:13:14 +00:00
#include <base/logger_useful.h>
2015-12-16 20:50:56 +00:00
namespace DB
{
template <typename T>
struct TrivialWeightFunction
{
2017-12-01 17:49:12 +00:00
size_t operator()(const T &) const
{
return 1;
}
};
2021-12-23 09:36:41 +00:00
template <typename T>
2021-12-27 02:20:07 +00:00
struct TrivialLRUCacheEvictPolicy
2021-12-23 09:36:41 +00:00
{
inline bool canRelease(std::shared_ptr<T>) const
{
return true;
}
inline void release(std::shared_ptr<T>)
{
}
};
2017-01-31 12:27:13 +00:00
/// Thread-safe cache that evicts entries which are not used for a long time.
2017-01-31 12:27:13 +00:00
/// WeightFunction is a functor that takes Mapped as a parameter and returns "weight" (approximate size)
/// of that value.
/// Cache starts to evict entries when their total weight exceeds max_size.
2017-01-31 12:27:13 +00:00
/// Value weight should not change after insertion.
2021-12-23 09:36:41 +00:00
template <typename TKey,
typename TMapped,
typename HashFunction = std::hash<TKey>,
typename WeightFunction = TrivialWeightFunction<TMapped>,
2021-12-27 02:20:07 +00:00
typename EvictPolicy = TrivialLRUCacheEvictPolicy<TMapped>>
class LRUCache
{
public:
using Key = TKey;
using Mapped = TMapped;
using MappedPtr = std::shared_ptr<Mapped>;
2021-12-27 08:12:39 +00:00
2021-12-27 02:20:07 +00:00
struct Result
{
MappedPtr value;
2021-12-27 08:12:39 +00:00
// if key is in cache, cache_miss is true
2021-12-27 02:20:07 +00:00
bool cache_miss = true;
2021-12-27 08:12:39 +00:00
// set_successful is false in default
// when value is loaded by load_fun in getOrSet(), and setImpl returns true, set_successful = true
2021-12-27 02:20:07 +00:00
bool set_successful = false;
};
2015-05-07 10:31:50 +00:00
2021-10-26 10:00:42 +00:00
/** Initialize LRUCache with max_size and max_elements_size.
* max_elements_size == 0 means no elements size restrictions.
*/
LRUCache(size_t max_size_, size_t max_elements_size_ = 0)
: max_size(std::max(static_cast<size_t>(1), max_size_))
, max_elements_size(max_elements_size_)
{}
MappedPtr get(const Key & key)
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock(mutex);
auto res = getImpl(key, lock);
if (res)
++hits;
else
++misses;
return res;
}
void set(const Key & key, const MappedPtr & mapped)
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock(mutex);
setImpl(key, mapped, lock);
}
2021-12-23 09:36:41 +00:00
/**
* trySet() will fail (return false) if there is no space left and no keys could be evicted.
* Eviction permission of each key is defined by EvictPolicy. In default policy there is no restriction.
*/
bool trySet(const Key & key, const MappedPtr & mapped)
{
std::lock_guard lock(mutex);
return setImpl(key, mapped, lock);
}
/// If the value for the key is in the cache, returns it. If it is not, calls load_func() to
/// produce it, saves the result in the cache and returns it.
2021-12-27 08:21:10 +00:00
/// Only one of several concurrent threads calling getOrSet() will call load_func(),
/// others will wait for that call to complete and will use its result (this helps prevent cache stampede).
2020-08-08 00:47:03 +00:00
/// Exceptions occurring in load_func will be propagated to the caller. Another thread from the
/// set of concurrent threads will then try to call its load_func etc.
///
2017-04-11 02:53:29 +00:00
template <typename LoadFunc>
2021-12-27 08:12:39 +00:00
Result getOrSet(const Key &key, LoadFunc && load_func)
{
InsertTokenHolder token_holder;
{
2019-01-02 06:44:36 +00:00
std::lock_guard cache_lock(mutex);
auto val = getImpl(key, cache_lock);
if (val)
{
++hits;
2021-12-23 09:36:41 +00:00
return {val, false, false};
}
auto & token = insert_tokens[key];
if (!token)
token = std::make_shared<InsertToken>(*this);
token_holder.acquire(&key, token, cache_lock);
}
InsertToken * token = token_holder.token.get();
2019-01-02 06:44:36 +00:00
std::lock_guard token_lock(token->mutex);
token_holder.cleaned_up = token->cleaned_up;
if (token->value)
{
/// Another thread already produced the value while we waited for token->mutex.
++hits;
2021-12-23 09:36:41 +00:00
return {token->value, false, false};
}
++misses;
token->value = load_func();
2019-01-02 06:44:36 +00:00
std::lock_guard cache_lock(mutex);
/// Insert the new value only if the token is still in present in insert_tokens.
/// (The token may be absent because of a concurrent reset() call).
2021-12-23 09:36:41 +00:00
bool is_value_loaded = false;
2021-12-27 10:02:02 +00:00
bool is_value_loaded_and_set = false;
auto token_it = insert_tokens.find(key);
if (token_it != insert_tokens.end() && token_it->second.get() == token)
{
2021-12-27 10:02:02 +00:00
is_value_loaded_and_set = setImpl(key, token->value, cache_lock);
2021-12-23 09:36:41 +00:00
is_value_loaded = true;
}
if (!token->cleaned_up)
token_holder.cleanup(token_lock, cache_lock);
2021-12-27 10:02:02 +00:00
return {token->value, is_value_loaded, is_value_loaded_and_set};
2021-12-23 09:36:41 +00:00
}
/// If key is not in cache or the element can be released, return is true. otherwise, return is false
bool tryRemove(const Key & key)
{
std::lock_guard loc(mutex);
auto it = cells.find(key);
if (it == cells.end())
return true;
auto & cell = it->second;
if (!evict_policy.canRelease(cell.value))
return false;
evict_policy.release(cell.value);
current_size -= cell.size;
cells.erase(it);
queue.erase(cell.queue_iterator);
return true;
}
void getStats(size_t & out_hits, size_t & out_misses) const
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock(mutex);
out_hits = hits;
out_misses = misses;
}
size_t weight() const
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock(mutex);
return current_size;
}
size_t count() const
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock(mutex);
return cells.size();
}
size_t maxSize() const
{
return max_size;
}
void reset()
{
2019-01-02 06:44:36 +00:00
std::lock_guard lock(mutex);
queue.clear();
cells.clear();
insert_tokens.clear();
current_size = 0;
hits = 0;
misses = 0;
}
virtual ~LRUCache() {}
2017-01-31 12:27:13 +00:00
2018-09-03 10:14:05 +00:00
protected:
using LRUQueue = std::list<Key>;
using LRUQueueIterator = typename LRUQueue::iterator;
struct Cell
{
MappedPtr value;
size_t size;
LRUQueueIterator queue_iterator;
};
using Cells = std::unordered_map<Key, Cell, HashFunction>;
Cells cells;
mutable std::mutex mutex;
private:
/// Represents pending insertion attempt.
struct InsertToken
{
explicit InsertToken(LRUCache & cache_) : cache(cache_) {}
std::mutex mutex;
bool cleaned_up = false; /// Protected by the token mutex
MappedPtr value; /// Protected by the token mutex
LRUCache & cache;
size_t refcount = 0; /// Protected by the cache mutex
};
using InsertTokenById = std::unordered_map<Key, std::shared_ptr<InsertToken>, HashFunction>;
/// This class is responsible for removing used insert tokens from the insert_tokens map.
/// Among several concurrent threads the first successful one is responsible for removal. But if they all
/// fail, then the last one is responsible.
struct InsertTokenHolder
{
const Key * key = nullptr;
std::shared_ptr<InsertToken> token;
bool cleaned_up = false;
InsertTokenHolder() = default;
2017-12-01 17:49:12 +00:00
void acquire(const Key * key_, const std::shared_ptr<InsertToken> & token_, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
{
key = key_;
token = token_;
++token->refcount;
}
2017-12-01 17:49:12 +00:00
void cleanup([[maybe_unused]] std::lock_guard<std::mutex> & token_lock, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
{
token->cache.insert_tokens.erase(*key);
token->cleaned_up = true;
cleaned_up = true;
}
~InsertTokenHolder()
{
if (!token)
return;
if (cleaned_up)
return;
2019-01-02 06:44:36 +00:00
std::lock_guard token_lock(token->mutex);
if (token->cleaned_up)
return;
2019-01-02 06:44:36 +00:00
std::lock_guard cache_lock(token->cache.mutex);
--token->refcount;
if (token->refcount == 0)
cleanup(token_lock, cache_lock);
}
};
friend struct InsertTokenHolder;
InsertTokenById insert_tokens;
LRUQueue queue;
/// Total weight of values.
size_t current_size = 0;
const size_t max_size;
const size_t max_elements_size;
std::atomic<size_t> hits {0};
std::atomic<size_t> misses {0};
WeightFunction weight_function;
2021-12-23 09:36:41 +00:00
EvictPolicy evict_policy;
2017-12-01 17:49:12 +00:00
MappedPtr getImpl(const Key & key, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
{
auto it = cells.find(key);
if (it == cells.end())
{
return MappedPtr();
}
Cell & cell = it->second;
/// Move the key to the end of the queue. The iterator remains valid.
queue.splice(queue.end(), queue, cell.queue_iterator);
return cell.value;
}
2021-12-23 09:36:41 +00:00
bool setImpl(const Key & key, const MappedPtr & mapped, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
{
auto [it, inserted] = cells.emplace(std::piecewise_construct,
std::forward_as_tuple(key),
std::forward_as_tuple());
Cell & cell = it->second;
auto value_weight = mapped ? weight_function(*mapped) : 0;
if (inserted)
{
try
{
cell.queue_iterator = queue.insert(queue.end(), key);
}
catch (...)
{
cells.erase(it);
throw;
}
2021-12-27 12:46:51 +00:00
if (!removeOverflow())
{
// overflow is caused by inserting this element.
queue.erase(cell.queue_iterator);
cells.erase(it);
return false;
}
}
else
{
2021-12-23 09:36:41 +00:00
if (!evict_policy.canRelease(cell.value))
return false;
if (value_weight > cell.size && !removeOverflow(value_weight - cell.size))
return false;
2021-12-23 09:36:41 +00:00
evict_policy.release(cell.value); // release the old value. this action is empty in default policy.
current_size -= cell.size;
queue.splice(queue.end(), queue, cell.queue_iterator);
}
cell.value = mapped;
2021-12-27 12:33:25 +00:00
cell.size = value_weight;
current_size += cell.size;
2021-12-23 09:36:41 +00:00
return true;
}
2021-12-23 09:36:41 +00:00
bool removeOverflow(size_t required_size_to_remove = 0)
{
size_t current_weight_lost = 0;
size_t queue_size = cells.size();
2021-12-23 09:36:41 +00:00
auto key_it = queue.begin();
auto is_overflow = [&] { return (current_size + required_size_to_remove > max_size || (max_elements_size != 0 && queue_size > max_elements_size)); };
while (is_overflow() && (queue_size > 1) && (key_it != queue.end()))
{
2021-12-23 09:36:41 +00:00
const Key & key = *key_it;
auto it = cells.find(key);
if (it == cells.end())
{
2020-05-30 21:57:37 +00:00
LOG_ERROR(&Poco::Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it.");
abort();
}
const auto & cell = it->second;
2021-12-23 09:36:41 +00:00
if (evict_policy.canRelease(cell.value))// in default, it is true
{
// always call release() before erasing an element
// in default, it's an empty action
evict_policy.release(cell.value);
2021-12-23 09:36:41 +00:00
current_size -= cell.size;
current_weight_lost += cell.size;
2021-12-23 09:36:41 +00:00
cells.erase(it);
key_it = queue.erase(key_it);
--queue_size;
}
else
{
key_it++;
}
}
onRemoveOverflowWeightLoss(current_weight_lost);
if (current_size > (1ull << 63))
{
2020-05-30 21:57:37 +00:00
LOG_ERROR(&Poco::Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it.");
abort();
}
2021-12-23 09:36:41 +00:00
return !is_overflow();
}
/// Override this method if you want to track how much weight was lost in removeOverflow method.
virtual void onRemoveOverflowWeightLoss(size_t /*weight_loss*/) {}
};
}