ClickHouse/src/Common/LRUCache.h
Robert Schulze 5a4f21c50f
Support for Clang Thread Safety Analysis (TSA)
- TSA is a static analyzer build by Google which finds race conditions
  and deadlocks at compile time.

- It works by associating a shared member variable with a
  synchronization primitive that protects it. The compiler can then
  check at each access if proper locking happened before. A good
  introduction are [0] and [1].

- TSA requires some help by the programmer via annotations. Luckily,
  LLVM's libcxx already has annotations for std::mutex, std::lock_guard,
  std::shared_mutex and std::scoped_lock. This commit enables them
  (--> contrib/libcxx-cmake/CMakeLists.txt).

- Further, this commit adds convenience macros for the low-level
  annotations for use in ClickHouse (--> base/defines.h). For
  demonstration, they are leveraged in a few places.

- As we compile with "-Wall -Wextra -Weverything", the required compiler
  flag "-Wthread-safety-analysis" was already enabled. Negative checks
  are an experimental feature of TSA and disabled
  (--> cmake/warnings.cmake). Compile times did not increase noticeably.

- TSA is used in a few places with simple locking. I tried TSA also
  where locking is more complex. The problem was usually that it is
  unclear which data is protected by which lock :-(. But there was
  definitely some weird code where locking looked broken. So there is
  some potential to find bugs.

*** Limitations of TSA besides the ones listed in [1]:

- The programmer needs to know which lock protects which piece of shared
  data. This is not always easy for large classes.

- Two synchronization primitives used in ClickHouse are not annotated in
  libcxx:
  (1) std::unique_lock: A releaseable lock handle often together with
      std::condition_variable, e.g. in solve producer-consumer problems.
  (2) std::recursive_mutex: A re-entrant mutex variant. Its usage can be
      considered a design flaw + typically it is slower than a standard
      mutex. In this commit, one std::recursive_mutex was converted to
      std::mutex and annotated with TSA.

- For free-standing functions (e.g. helper functions) which are passed
  shared data members, it can be tricky to specify the associated lock.
  This is because the annotations use the normal C++ rules for symbol
  resolution.

[0] https://clang.llvm.org/docs/ThreadSafetyAnalysis.html
[1] https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/42958.pdf
2022-06-20 16:13:25 +02:00

368 lines
10 KiB
C++

#pragma once
#include <unordered_map>
#include <list>
#include <memory>
#include <chrono>
#include <mutex>
#include <atomic>
#include <Common/logger_useful.h>
#include <base/defines.h>
namespace DB
{
template <typename T>
struct TrivialWeightFunction
{
size_t operator()(const T &) const
{
return 1;
}
};
/// Thread-safe cache that evicts entries which are not used for a long time.
/// WeightFunction is a functor that takes Mapped as a parameter and returns "weight" (approximate size)
/// of that value.
/// Cache starts to evict entries when their total weight exceeds max_size.
/// Value weight should not change after insertion.
template <typename TKey, typename TMapped, typename HashFunction = std::hash<TKey>, typename WeightFunction = TrivialWeightFunction<TMapped>>
class LRUCache
{
public:
using Key = TKey;
using Mapped = TMapped;
using MappedPtr = std::shared_ptr<Mapped>;
/** Initialize LRUCache with max_size and max_elements_size.
* max_elements_size == 0 means no elements size restrictions.
*/
explicit LRUCache(size_t max_size_, size_t max_elements_size_ = 0)
: max_size(std::max(static_cast<size_t>(1), max_size_))
, max_elements_size(max_elements_size_)
{}
MappedPtr get(const Key & key)
{
std::lock_guard lock(mutex);
auto res = getImpl(key);
if (res)
++hits;
else
++misses;
return res;
}
void set(const Key & key, const MappedPtr & mapped)
{
std::lock_guard lock(mutex);
setImpl(key, mapped);
}
void remove(const Key & key)
{
std::lock_guard lock(mutex);
auto it = cells.find(key);
if (it == cells.end())
return;
auto & cell = it->second;
current_size -= cell.size;
queue.erase(cell.queue_iterator);
cells.erase(it);
}
/// If the value for the key is in the cache, returns it. If it is not, calls load_func() to
/// produce it, saves the result in the cache and returns it.
/// Only one of several concurrent threads calling getOrSet() will call load_func(),
/// others will wait for that call to complete and will use its result (this helps prevent cache stampede).
/// Exceptions occurring in load_func will be propagated to the caller. Another thread from the
/// set of concurrent threads will then try to call its load_func etc.
///
/// Returns std::pair of the cached value and a bool indicating whether the value was produced during this call.
template <typename LoadFunc>
std::pair<MappedPtr, bool> getOrSet(const Key & key, LoadFunc && load_func)
{
InsertTokenHolder token_holder;
{
std::lock_guard cache_lock(mutex);
auto val = getImpl(key);
if (val)
{
++hits;
return std::make_pair(val, false);
}
auto & token = insert_tokens[key];
if (!token)
token = std::make_shared<InsertToken>(*this);
token_holder.acquire(&key, token, cache_lock);
}
InsertToken * token = token_holder.token.get();
std::lock_guard token_lock(token->mutex);
token_holder.cleaned_up = token->cleaned_up;
if (token->value)
{
/// Another thread already produced the value while we waited for token->mutex.
++hits;
return std::make_pair(token->value, false);
}
++misses;
token->value = load_func();
std::lock_guard cache_lock(mutex);
/// Insert the new value only if the token is still in present in insert_tokens.
/// (The token may be absent because of a concurrent reset() call).
bool result = false;
auto token_it = insert_tokens.find(key);
if (token_it != insert_tokens.end() && token_it->second.get() == token)
{
setImpl(key, token->value);
result = true;
}
if (!token->cleaned_up)
token_holder.cleanup(token_lock, cache_lock);
return std::make_pair(token->value, result);
}
void getStats(size_t & out_hits, size_t & out_misses) const
{
std::lock_guard lock(mutex);
out_hits = hits;
out_misses = misses;
}
size_t weight() const
{
std::lock_guard lock(mutex);
return current_size;
}
size_t count() const
{
std::lock_guard lock(mutex);
return cells.size();
}
size_t maxSize() const
{
return max_size;
}
void reset()
{
std::lock_guard lock(mutex);
queue.clear();
cells.clear();
insert_tokens.clear();
current_size = 0;
hits = 0;
misses = 0;
}
virtual ~LRUCache() = default;
protected:
using LRUQueue = std::list<Key>;
using LRUQueueIterator = typename LRUQueue::iterator;
struct Cell
{
MappedPtr value;
size_t size;
LRUQueueIterator queue_iterator;
};
using Cells = std::unordered_map<Key, Cell, HashFunction>;
Cells cells TSA_GUARDED_BY(mutex);
mutable std::mutex mutex;
private:
/// Represents pending insertion attempt.
struct InsertToken
{
explicit InsertToken(LRUCache & cache_) : cache(cache_) {}
std::mutex mutex;
bool cleaned_up TSA_GUARDED_BY(mutex) = false;
MappedPtr value TSA_GUARDED_BY(mutex);
LRUCache & cache;
size_t refcount = 0; /// Protected by the cache mutex
};
using InsertTokenById = std::unordered_map<Key, std::shared_ptr<InsertToken>, HashFunction>;
/// This class is responsible for removing used insert tokens from the insert_tokens map.
/// Among several concurrent threads the first successful one is responsible for removal. But if they all
/// fail, then the last one is responsible.
struct InsertTokenHolder
{
const Key * key = nullptr;
std::shared_ptr<InsertToken> token;
bool cleaned_up = false;
InsertTokenHolder() = default;
void acquire(const Key * key_, const std::shared_ptr<InsertToken> & token_, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
TSA_NO_THREAD_SAFETY_ANALYSIS // disabled only because we can't reference the parent-level cache mutex from here
{
key = key_;
token = token_;
++token->refcount;
}
void cleanup([[maybe_unused]] std::lock_guard<std::mutex> & token_lock, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
TSA_NO_THREAD_SAFETY_ANALYSIS // disabled only because we can't reference the parent-level cache mutex from here
{
token->cache.insert_tokens.erase(*key);
token->cleaned_up = true;
cleaned_up = true;
}
~InsertTokenHolder()
{
if (!token)
return;
if (cleaned_up)
return;
std::lock_guard token_lock(token->mutex);
if (token->cleaned_up)
return;
std::lock_guard cache_lock(token->cache.mutex);
--token->refcount;
if (token->refcount == 0)
cleanup(token_lock, cache_lock);
}
};
friend struct InsertTokenHolder;
InsertTokenById insert_tokens TSA_GUARDED_BY(mutex);
LRUQueue queue TSA_GUARDED_BY(mutex);
/// Total weight of values.
size_t current_size TSA_GUARDED_BY(mutex) = 0;
const size_t max_size;
const size_t max_elements_size;
std::atomic<size_t> hits {0};
std::atomic<size_t> misses {0};
const WeightFunction weight_function;
MappedPtr getImpl(const Key & key) TSA_REQUIRES(mutex)
{
auto it = cells.find(key);
if (it == cells.end())
{
return MappedPtr();
}
Cell & cell = it->second;
/// Move the key to the end of the queue. The iterator remains valid.
queue.splice(queue.end(), queue, cell.queue_iterator);
return cell.value;
}
void setImpl(const Key & key, const MappedPtr & mapped) TSA_REQUIRES(mutex)
{
auto [it, inserted] = cells.emplace(std::piecewise_construct,
std::forward_as_tuple(key),
std::forward_as_tuple());
Cell & cell = it->second;
if (inserted)
{
try
{
cell.queue_iterator = queue.insert(queue.end(), key);
}
catch (...)
{
cells.erase(it);
throw;
}
}
else
{
current_size -= cell.size;
queue.splice(queue.end(), queue, cell.queue_iterator);
}
cell.value = mapped;
cell.size = cell.value ? weight_function(*cell.value) : 0;
current_size += cell.size;
removeOverflow();
}
void removeOverflow() TSA_REQUIRES(mutex)
{
size_t current_weight_lost = 0;
size_t queue_size = cells.size();
while ((current_size > max_size || (max_elements_size != 0 && queue_size > max_elements_size)) && (queue_size > 1))
{
const Key & key = queue.front();
auto it = cells.find(key);
if (it == cells.end())
{
LOG_ERROR(&Poco::Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it.");
abort();
}
const auto & cell = it->second;
current_size -= cell.size;
current_weight_lost += cell.size;
cells.erase(it);
queue.pop_front();
--queue_size;
}
onRemoveOverflowWeightLoss(current_weight_lost);
if (current_size > (1ull << 63))
{
LOG_ERROR(&Poco::Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it.");
abort();
}
}
/// Override this method if you want to track how much weight was lost in removeOverflow method.
virtual void onRemoveOverflowWeightLoss(size_t /*weight_loss*/) {}
};
}