Add LRUResourceCache

1. add LRUResourceCache for managing resource cache in lru policy
2. rollback LRUCache to the original version
3. add remove() in LRUCache
4. add unit tests for LRUResourceCache and LRUCache
This commit is contained in:
lgbo-ustc 2021-12-29 15:25:33 +08:00 committed by liangjiabiao
parent 413fb290bb
commit 59cbd76880
11 changed files with 610 additions and 113 deletions

View File

@ -6,11 +6,13 @@
#include <chrono>
#include <mutex>
#include <atomic>
#include <base/logger_useful.h>
namespace DB
{
template <typename T>
struct TrivialWeightFunction
{
@ -20,30 +22,13 @@ struct TrivialWeightFunction
}
};
template <typename T>
struct TrivialLRUCacheEvictPolicy
{
inline bool canRelease(std::shared_ptr<T>) const
{
return true;
}
inline void release(std::shared_ptr<T>)
{
}
};
/// Thread-safe cache that evicts entries which are not used for a long time.
/// WeightFunction is a functor that takes Mapped as a parameter and returns "weight" (approximate size)
/// of that value.
/// Cache starts to evict entries when their total weight exceeds max_size.
/// Value weight should not change after insertion.
template <typename TKey,
typename TMapped,
typename HashFunction = std::hash<TKey>,
typename WeightFunction = TrivialWeightFunction<TMapped>,
typename EvictPolicy = TrivialLRUCacheEvictPolicy<TMapped>>
template <typename TKey, typename TMapped, typename HashFunction = std::hash<TKey>, typename WeightFunction = TrivialWeightFunction<TMapped>>
class LRUCache
{
public:
@ -51,16 +36,6 @@ public:
using Mapped = TMapped;
using MappedPtr = std::shared_ptr<Mapped>;
struct Result
{
MappedPtr value;
// if key is in cache, cache_miss is true
bool cache_miss = true;
// set_successful is false in default
// when value is loaded by load_fun in getOrSet(), and setImpl returns true, set_successful = true
bool set_successful = false;
};
/** Initialize LRUCache with max_size and max_elements_size.
* max_elements_size == 0 means no elements size restrictions.
*/
@ -85,18 +60,19 @@ public:
void set(const Key & key, const MappedPtr & mapped)
{
std::lock_guard lock(mutex);
setImpl(key, mapped, lock);
}
/**
* trySet() will fail (return false) if there is no space left and no keys could be evicted.
* Eviction permission of each key is defined by EvictPolicy. In default policy there is no restriction.
*/
bool trySet(const Key & key, const MappedPtr & mapped)
void remove(const Key & key)
{
std::lock_guard lock(mutex);
return setImpl(key, mapped, lock);
auto it = cells.find(key);
if (it == cells.end())
return;
auto & cell = it->second;
current_size -= cell.size;
cells.erase(it);
}
/// If the value for the key is in the cache, returns it. If it is not, calls load_func() to
@ -106,8 +82,9 @@ public:
/// Exceptions occurring in load_func will be propagated to the caller. Another thread from the
/// set of concurrent threads will then try to call its load_func etc.
///
/// Returns std::pair of the cached value and a bool indicating whether the value was produced during this call.
template <typename LoadFunc>
Result getOrSet(const Key &key, LoadFunc && load_func)
std::pair<MappedPtr, bool> getOrSet(const Key & key, LoadFunc && load_func)
{
InsertTokenHolder token_holder;
{
@ -117,7 +94,7 @@ public:
if (val)
{
++hits;
return {val, false, false};
return std::make_pair(val, false);
}
auto & token = insert_tokens[key];
@ -137,7 +114,7 @@ public:
{
/// Another thread already produced the value while we waited for token->mutex.
++hits;
return {token->value, false, false};
return std::make_pair(token->value, false);
}
++misses;
@ -147,37 +124,18 @@ public:
/// Insert the new value only if the token is still in present in insert_tokens.
/// (The token may be absent because of a concurrent reset() call).
bool is_value_loaded = false;
bool is_value_loaded_and_set = false;
bool result = false;
auto token_it = insert_tokens.find(key);
if (token_it != insert_tokens.end() && token_it->second.get() == token)
{
is_value_loaded_and_set = setImpl(key, token->value, cache_lock);
is_value_loaded = true;
setImpl(key, token->value, cache_lock);
result = true;
}
if (!token->cleaned_up)
token_holder.cleanup(token_lock, cache_lock);
return {token->value, is_value_loaded, is_value_loaded_and_set};
}
/// If key is not in cache or the element can be released, return is true. otherwise, return is false
bool tryRemove(const Key & key)
{
std::lock_guard loc(mutex);
auto it = cells.find(key);
if (it == cells.end())
return true;
auto & cell = it->second;
if (!evict_policy.canRelease(cell.value))
return false;
evict_policy.release(cell.value);
current_size -= cell.size;
cells.erase(it);
queue.erase(cell.queue_iterator);
return true;
return std::make_pair(token->value, result);
}
void getStats(size_t & out_hits, size_t & out_misses) const
@ -312,7 +270,6 @@ private:
std::atomic<size_t> misses {0};
WeightFunction weight_function;
EvictPolicy evict_policy;
MappedPtr getImpl(const Key & key, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
{
@ -330,14 +287,13 @@ private:
return cell.value;
}
bool setImpl(const Key & key, const MappedPtr & mapped, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
void setImpl(const Key & key, const MappedPtr & mapped, [[maybe_unused]] std::lock_guard<std::mutex> & cache_lock)
{
auto [it, inserted] = cells.emplace(std::piecewise_construct,
std::forward_as_tuple(key),
std::forward_as_tuple());
Cell & cell = it->second;
auto value_weight = mapped ? weight_function(*mapped) : 0;
if (inserted)
{
@ -350,42 +306,28 @@ private:
cells.erase(it);
throw;
}
if (!removeOverflow())
{
// overflow is caused by inserting this element.
queue.erase(cell.queue_iterator);
cells.erase(it);
return false;
}
}
else
{
if (!evict_policy.canRelease(cell.value))
return false;
if (value_weight > cell.size && !removeOverflow(value_weight - cell.size))
return false;
evict_policy.release(cell.value); // release the old value. this action is empty in default policy.
current_size -= cell.size;
queue.splice(queue.end(), queue, cell.queue_iterator);
}
cell.value = mapped;
cell.size = value_weight;
cell.size = cell.value ? weight_function(*cell.value) : 0;
current_size += cell.size;
return true;
removeOverflow();
}
bool removeOverflow(size_t required_size_to_remove = 0)
void removeOverflow()
{
size_t current_weight_lost = 0;
size_t queue_size = cells.size();
auto key_it = queue.begin();
auto is_overflow = [&] { return (current_size + required_size_to_remove > max_size || (max_elements_size != 0 && queue_size > max_elements_size)); };
while (is_overflow() && (queue_size > 1) && (key_it != queue.end()))
while ((current_size > max_size || (max_elements_size != 0 && queue_size > max_elements_size)) && (queue_size > 1))
{
const Key & key = *key_it;
const Key & key = queue.front();
auto it = cells.find(key);
if (it == cells.end())
@ -395,23 +337,13 @@ private:
}
const auto & cell = it->second;
if (evict_policy.canRelease(cell.value))// in default, it is true
{
// always call release() before erasing an element
// in default, it's an empty action
evict_policy.release(cell.value);
current_size -= cell.size;
current_weight_lost += cell.size;
current_size -= cell.size;
current_weight_lost += cell.size;
cells.erase(it);
key_it = queue.erase(key_it);
--queue_size;
}
else
{
key_it++;
}
cells.erase(it);
queue.pop_front();
--queue_size;
}
onRemoveOverflowWeightLoss(current_weight_lost);
@ -421,7 +353,6 @@ private:
LOG_ERROR(&Poco::Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it.");
abort();
}
return !is_overflow();
}
/// Override this method if you want to track how much weight was lost in removeOverflow method.

View File

@ -0,0 +1,254 @@
#pragma once
#include <atomic>
#include <chrono>
#include <list>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <unordered_set>
#include <base/logger_useful.h>
namespace DB
{
template <typename T>
struct TrivailLRUResourceCacheWeightFunction
{
size_t operator()(const T &) const { return 1; }
};
/*
* A resource cache with key index. There is only one instance for every key which is not like the normal resource pool.
* Resource cache has max weight capacity and keys size limitation. If the limitation is exceeded, keys would be evicted
* by LRU policy.
*
* acquire and release must be used in pair.
*/
template <
typename TKey,
typename TMapped,
typename WeightFunction = TrivailLRUResourceCacheWeightFunction<TMapped>,
typename HashFunction = std::hash<TKey>>
class LRUResourceCache
{
public:
LRUResourceCache(size_t max_weight_, size_t max_element_size_ = 0) : max_weight(max_weight_), max_element_size(max_element_size_) { }
virtual ~LRUResourceCache() { }
using Key = TKey;
using Mapped = TMapped;
using MappedPtr = std::shared_ptr<Mapped>;
// - load_func : when key is not exists in cache, load_func is called to generate a new key
// - return: is null when there is no more space for the new value or the old value is in used.
template <typename LoadFunc>
MappedPtr acquire(const Key & key, LoadFunc && load_func)
{
InsertToken * insert_token = nullptr;
{
std::lock_guard lock(mutex);
auto it = cells.find(key);
if (it != cells.end())
{
hits++;
it->second.reference_count += 1;
return it->second.value;
}
misses++;
insert_token = acquireInsertToken(key);
}
Cell * cell_ptr = nullptr;
{
std::lock_guard lock(insert_token->mutex);
if (!insert_token->value)
{
insert_token->value = load_func();
std::lock_guard cell_lock(mutex);
cell_ptr = insert_value(key, insert_token->value);
if (cell_ptr)
{
cell_ptr->reference_count += 1;
}
else
{
insert_token->value = nullptr;
}
}
}
std::lock_guard lock(mutex);
releaseInsertToken(key);
if (cell_ptr)
{
return cell_ptr->value;
}
return nullptr;
}
MappedPtr acquire(const Key & key)
{
std::lock_guard lock(mutex);
auto it = cells.find(key);
if (it == cells.end())
{
misses++;
return nullptr;
}
hits++;
it->second.reference_count += 1;
queue.splice(queue.end(), queue, it->second.queue_iterator);
return it->second.value;
}
// mark a reference is released
void release(const Key & key)
{
std::lock_guard lock(mutex);
auto it = cells.find(key);
if (it == cells.end() || it->second.reference_count == 0)
{
LOG_ERROR(&Poco::Logger::get("LRUResourceCache"), "try to release an invalid element");
abort();
}
it->second.reference_count -= 1;
}
// If you want to update a value, call tryRemove() at first and then call acquire() with load_func.
bool tryRemove(const Key & key)
{
std::lock_guard guard(mutex);
auto it = cells.find(key);
if (it == cells.end())
return true;
auto & cell = it->second;
if (cell.reference_count)
return false;
queue.erase(cell.queue_iterator);
current_weight -= cell.weight;
cells.erase(it);
return true;
}
size_t weight()
{
std::lock_guard lock(mutex);
return current_weight;
}
size_t size()
{
std::lock_guard lock(mutex);
return cells.size();
}
void getStats(size_t & out_hits, size_t & out_misses, size_t & out_evict_count) const
{
out_hits = hits;
out_misses = misses;
out_evict_count = evict_count;
}
private:
mutable std::mutex mutex;
using LRUQueue = std::list<Key>;
using LRUQueueIterator = typename LRUQueue::iterator;
struct Cell
{
MappedPtr value;
size_t weight = 0;
LRUQueueIterator queue_iterator;
size_t reference_count = 0;
};
using Cells = std::unordered_map<Key, Cell, HashFunction>;
Cells cells;
LRUQueue queue;
size_t current_weight = 0;
size_t max_weight = 0;
size_t max_element_size = 0;
struct InsertToken
{
std::mutex mutex;
MappedPtr value;
size_t reference_count = 0;
};
using InsertTokens = std::unordered_map<Key, InsertToken, HashFunction>;
InsertTokens insert_tokens;
WeightFunction weight_function;
std::atomic<size_t> hits{0};
std::atomic<size_t> misses{0};
std::atomic<size_t> evict_count{0};
InsertToken * acquireInsertToken(const Key & key)
{
auto & token = insert_tokens[key];
token.reference_count += 1;
return &token;
}
void releaseInsertToken(const Key & key)
{
auto it = insert_tokens.find(key);
if (it != insert_tokens.end())
{
it->second.reference_count -= 1;
if (it->second.reference_count == 0)
insert_tokens.erase(it);
}
}
// key mustn't be in the cache
Cell * insert_value(const Key & insert_key, MappedPtr value)
{
auto weight = value ? weight_function(*value) : 0;
auto queue_size = cells.size() + 1;
auto loss_weight = 0;
auto is_overflow = [&] {
return current_weight + weight - loss_weight > max_weight || (max_element_size != 0 && queue_size > max_element_size);
};
auto key_it = queue.begin();
std::unordered_set<Key, HashFunction> to_release_keys;
while (is_overflow() && queue_size > 1 && key_it != queue.end())
{
const Key & key = *key_it;
auto cell_it = cells.find(key);
if (cell_it == cells.end())
{
LOG_ERROR(&Poco::Logger::get("LRUResourceCache"), "LRUResourceCache became inconsistent. There must be a bug in it.");
abort();
}
auto & cell = cell_it->second;
if (cell.reference_count == 0)
{
loss_weight += cell.weight;
queue_size -= 1;
to_release_keys.insert(key);
}
key_it++;
}
if (is_overflow())
return nullptr;
if (loss_weight > current_weight + weight)
{
LOG_ERROR(&Poco::Logger::get("LRUResourceCache"), "LRUResourceCache became inconsistent. There must be a bug in it.");
abort();
}
for (auto & key : to_release_keys)
{
auto & cell = cells[key];
queue.erase(cell.queue_iterator);
cells.erase(key);
evict_count++;
}
current_weight = current_weight + weight - loss_weight;
auto & new_cell = cells[insert_key];
new_cell.value = value;
new_cell.weight = weight;
new_cell.queue_iterator = queue.insert(queue.end(), insert_key);
return &new_cell;
}
};
}

View File

@ -0,0 +1,102 @@
#include <iomanip>
#include <iostream>
#include <gtest/gtest.h>
#include <Common/LRUCache.h>
TEST(LRUCache, set)
{
using SimpleLRUCache =DB::LRUCache<int, int>;
auto lru_cache = SimpleLRUCache(10, 10);
lru_cache.set(1, std::make_shared<int>(2));
lru_cache.set(2,std::make_shared<int>(3));
auto w = lru_cache.weight();
auto n = lru_cache.count();
ASSERT_EQ(w, 2);
ASSERT_EQ(n, 2);
}
TEST(LRUCache, update)
{
using SimpleLRUCache =DB::LRUCache<int, int>;
auto lru_cache = SimpleLRUCache(10, 10);
lru_cache.set(1, std::make_shared<int>(2));
lru_cache.set(1,std::make_shared<int>(3));
auto val = lru_cache.get(1);
ASSERT_TRUE(val != nullptr);
ASSERT_TRUE(*val == 3);
}
TEST(LRUCache, get)
{
using SimpleLRUCache =DB::LRUCache<int, int>;
auto lru_cache = SimpleLRUCache(10, 10);
lru_cache.set(1, std::make_shared<int>(2));
lru_cache.set(2, std::make_shared<int>(3));
SimpleLRUCache::MappedPtr value = lru_cache.get(1);
ASSERT_TRUE(value != nullptr);
ASSERT_EQ(*value, 2);
value = lru_cache.get(2);
ASSERT_TRUE(value != nullptr);
ASSERT_EQ(*value, 3);
}
struct ValueWeight
{
size_t operator()(const size_t & x) const
{
return x;
}
};
TEST(LRUCache, evict_on_size)
{
using SimpleLRUCache =DB::LRUCache<int, size_t>;
auto lru_cache = SimpleLRUCache(20, 3);
lru_cache.set(1, std::make_shared<size_t>(2));
lru_cache.set(2, std::make_shared<size_t>(3));
lru_cache.set(3, std::make_shared<size_t>(4));
lru_cache.set(4, std::make_shared<size_t>(5));
auto n = lru_cache.count();
ASSERT_EQ(n, 3);
auto value = lru_cache.get(1);
ASSERT_TRUE(value == nullptr);
}
TEST(LRUCache, evict_on_weight)
{
using SimpleLRUCache =DB::LRUCache<int, size_t, std::hash<int>, ValueWeight>;
auto lru_cache = SimpleLRUCache(10, 10);
lru_cache.set(1, std::make_shared<size_t>(2));
lru_cache.set(2, std::make_shared<size_t>(3));
lru_cache.set(3, std::make_shared<size_t>(4));
lru_cache.set(4, std::make_shared<size_t>(5));
auto n = lru_cache.count();
ASSERT_EQ(n, 2);
auto w = lru_cache.weight();
ASSERT_EQ(w, 9);
auto value = lru_cache.get(1);
ASSERT_TRUE(value == nullptr);
value = lru_cache.get(2);
ASSERT_TRUE(value == nullptr);
}
TEST(LRUCache, getOrSet)
{
using SimpleLRUCache =DB::LRUCache<int, size_t, std::hash<int>, ValueWeight>;
auto lru_cache = SimpleLRUCache(10, 10);
size_t x = 10;
auto load_func = [&]{ return std::make_shared<size_t>(x); };
auto [value, loaded] = lru_cache.getOrSet(1, load_func);
ASSERT_TRUE(value != nullptr);
ASSERT_TRUE(*value == 10);
}

View File

@ -0,0 +1,212 @@
#include <iomanip>
#include <iostream>
#include <gtest/gtest.h>
#include <Common/LRUResourceCache.h>
TEST(LRUResourceCache, acquire)
{
using MyCache = DB::LRUResourceCache<int, int>;
auto mcache = MyCache(10, 10);
int x = 10;
auto load_int = [&] { return std::make_shared<int>(x); };
auto val = mcache.acquire(1, load_int);
x = 11;
val = mcache.acquire(2, load_int);
ASSERT_TRUE(val != nullptr);
ASSERT_TRUE(*val == 11);
val = mcache.acquire(1);
ASSERT_TRUE(val != nullptr);
ASSERT_TRUE(*val == 10);
}
TEST(LRUResourceCache, remove)
{
using MyCache = DB::LRUResourceCache<int, int>;
auto mcache = MyCache(10, 10);
int x = 10;
auto load_int = [&] { return std::make_shared<int>(x); };
auto val = mcache.acquire(1, load_int);
x = 11;
val = mcache.acquire(2, load_int);
auto succ = mcache.tryRemove(3);
ASSERT_TRUE(succ);
succ = mcache.tryRemove(1);
ASSERT_TRUE(!succ);
val = mcache.acquire(1);
ASSERT_TRUE(val != nullptr);
ASSERT_TRUE(*val == 10);
mcache.release(1);
succ = mcache.tryRemove(1);
ASSERT_TRUE(!succ);
mcache.release(1);
succ = mcache.tryRemove(1);
ASSERT_TRUE(succ);
val = mcache.acquire(1);
ASSERT_TRUE(val == nullptr);
}
struct MyWeight
{
size_t operator()(const int & x) const { return static_cast<size_t>(x); }
};
TEST(LRUResourceCache, evict_on_weight)
{
using MyCache = DB::LRUResourceCache<int, int, MyWeight>;
auto mcache = MyCache(5, 10);
int x = 2;
auto load_int = [&] { return std::make_shared<int>(x); };
auto val = mcache.acquire(1, load_int);
mcache.release(1);
val = mcache.acquire(2, load_int);
mcache.release(2);
x = 3;
val = mcache.acquire(3, load_int);
ASSERT_TRUE(val != nullptr);
auto w = mcache.weight();
ASSERT_EQ(w, 5);
auto n = mcache.size();
ASSERT_EQ(n, 2);
val = mcache.acquire(1);
ASSERT_TRUE(val == nullptr);
val = mcache.acquire(2);
ASSERT_TRUE(val != nullptr);
val = mcache.acquire(3);
ASSERT_TRUE(val != nullptr);
}
TEST(LRUResourceCache, evict_on_size)
{
using MyCache = DB::LRUResourceCache<int, int>;
auto mcache = MyCache(5, 2);
int x = 2;
auto load_int = [&] { return std::make_shared<int>(x); };
auto val = mcache.acquire(1, load_int);
mcache.release(1);
val = mcache.acquire(2, load_int);
mcache.release(2);
x = 3;
val = mcache.acquire(3, load_int);
ASSERT_TRUE(val != nullptr);
auto n = mcache.size();
ASSERT_EQ(n, 2);
auto w = mcache.weight();
ASSERT_EQ(w, 2);
val = mcache.acquire(1);
ASSERT_TRUE(val == nullptr);
val = mcache.acquire(2);
ASSERT_TRUE(val != nullptr);
val = mcache.acquire(3);
ASSERT_TRUE(val != nullptr);
}
TEST(LRUResourceCache, not_evict_used_element)
{
using MyCache = DB::LRUResourceCache<int, int, MyWeight>;
auto mcache = MyCache(7, 10);
int x = 2;
auto load_int = [&] { return std::make_shared<int>(x); };
auto val = mcache.acquire(1, load_int);
val = mcache.acquire(2, load_int);
mcache.release(2);
val = mcache.acquire(3, load_int);
mcache.release(3);
x = 3;
val = mcache.acquire(4, load_int);
ASSERT_TRUE(val != nullptr);
auto n = mcache.size();
ASSERT_EQ(n, 3);
auto w = mcache.weight();
ASSERT_EQ(w, 7);
val = mcache.acquire(1);
ASSERT_TRUE(val != nullptr);
val = mcache.acquire(2);
ASSERT_TRUE(val == nullptr);
val = mcache.acquire(3);
ASSERT_TRUE(val != nullptr);
val = mcache.acquire(4);
ASSERT_TRUE(val != nullptr);
}
TEST(LRUResourceCache, acquire_fail)
{
using MyCache = DB::LRUResourceCache<int, int, MyWeight>;
auto mcache = MyCache(5, 10);
int x = 2;
auto load_int = [&] { return std::make_shared<int>(x); };
auto val = mcache.acquire(1, load_int);
val = mcache.acquire(2, load_int);
val = mcache.acquire(3, load_int);
ASSERT_TRUE(val == nullptr);
auto n = mcache.size();
ASSERT_EQ(n, 2);
auto w = mcache.weight();
ASSERT_EQ(w, 4);
val = mcache.acquire(1);
ASSERT_TRUE(val != nullptr);
val = mcache.acquire(2);
ASSERT_TRUE(val != nullptr);
val = mcache.acquire(3);
ASSERT_TRUE(val == nullptr);
}
TEST(LRUResourceCache, dup_acquire)
{
using MyCache = DB::LRUResourceCache<int, int, MyWeight>;
auto mcache = MyCache(20, 10);
int x = 2;
auto load_int = [&] { return std::make_shared<int>(x); };
auto val = mcache.acquire(1, load_int);
mcache.release(1);
x = 11;
val = mcache.acquire(1, load_int);
ASSERT_TRUE(val != nullptr);
auto n = mcache.size();
ASSERT_EQ(n, 1);
auto w = mcache.weight();
ASSERT_EQ(w, 2);
val = mcache.acquire(1);
ASSERT_TRUE(val != nullptr);
ASSERT_TRUE(*val == 2);
}
TEST(LRUResourceCache, re_acquire)
{
using MyCache = DB::LRUResourceCache<int, int, MyWeight>;
auto mcache = MyCache(20, 10);
int x = 2;
auto load_int = [&] { return std::make_shared<int>(x); };
auto val = mcache.acquire(1, load_int);
mcache.release(1);
mcache.tryRemove(1);
x = 11;
val = mcache.acquire(1, load_int);
ASSERT_TRUE(val != nullptr);
auto n = mcache.size();
ASSERT_EQ(n, 1);
auto w = mcache.weight();
ASSERT_EQ(w, 11);
val = mcache.acquire(1);
ASSERT_TRUE(val != nullptr);
ASSERT_TRUE(*val == 11);
}

View File

@ -48,12 +48,12 @@ public:
MappedPtr getOrSet(const Key & key, LoadFunc && load)
{
auto result = Base::getOrSet(key, load);
if (result.cache_miss)
if (result.second)
ProfileEvents::increment(ProfileEvents::MMappedFileCacheMisses);
else
ProfileEvents::increment(ProfileEvents::MMappedFileCacheHits);
return result.value;
return result.first;
}
};

View File

@ -63,12 +63,12 @@ public:
{
auto result = Base::getOrSet(key, std::forward<LoadFunc>(load));
if (result.cache_miss)
if (result.second)
ProfileEvents::increment(ProfileEvents::UncompressedCacheMisses);
else
ProfileEvents::increment(ProfileEvents::UncompressedCacheHits);
return result.value;
return result.first;
}
private:

View File

@ -354,14 +354,13 @@ void Aggregator::compileAggregateFunctionsIfNeeded()
if (auto * compilation_cache = CompiledExpressionCacheFactory::instance().tryGetCache())
{
auto result = compilation_cache->getOrSet(aggregate_functions_description_hash_key, [&] ()
auto [compiled_function_cache_entry, _] = compilation_cache->getOrSet(aggregate_functions_description_hash_key, [&] ()
{
LOG_TRACE(log, "Compile expression {}", functions_description);
auto compiled_aggregate_functions = compileAggregateFunctions(getJITInstance(), functions_to_compile, functions_description);
return std::make_shared<CompiledAggregateFunctionsHolder>(std::move(compiled_aggregate_functions));
});
auto compiled_function_cache_entry = result.value;
compiled_aggregate_functions_holder = std::static_pointer_cast<CompiledAggregateFunctionsHolder>(compiled_function_cache_entry);
}
else

View File

@ -296,13 +296,12 @@ static FunctionBasePtr compile(
if (auto * compilation_cache = CompiledExpressionCacheFactory::instance().tryGetCache())
{
auto result = compilation_cache->getOrSet(hash_key, [&] ()
auto [compiled_function_cache_entry, _] = compilation_cache->getOrSet(hash_key, [&] ()
{
LOG_TRACE(getLogger(), "Compile expression {}", llvm_function->getName());
auto compiled_function = compileFunction(getJITInstance(), *llvm_function);
return std::make_shared<CompiledFunctionHolder>(compiled_function);
});
auto compiled_function_cache_entry = result.value;
std::shared_ptr<CompiledFunctionHolder> compiled_function_holder = std::static_pointer_cast<CompiledFunctionHolder>(compiled_function_cache_entry);
llvm_function->setCompiledFunction(std::move(compiled_function_holder));

View File

@ -1033,7 +1033,7 @@ std::shared_ptr<Block> MergeJoin::loadRightBlock(size_t pos) const
return std::make_shared<Block>(input.block_in->read());
};
return cached_right_blocks->getOrSet(pos, load_func).value;
return cached_right_blocks->getOrSet(pos, load_func).first;
}
else
return loaded_right_blocks[pos];

View File

@ -650,7 +650,7 @@ public:
avro::ValidSchema getSchema(uint32_t id)
{
auto [schema, loaded, _] = schema_cache.getOrSet(
auto [schema, loaded] = schema_cache.getOrSet(
id,
[this, id](){ return std::make_shared<avro::ValidSchema>(fetchSchema(id)); }
);
@ -727,7 +727,7 @@ static LRUCache<std::string, ConfluentSchemaRegistry> schema_registry_cache(SCH
static std::shared_ptr<ConfluentSchemaRegistry> getConfluentSchemaRegistry(const FormatSettings & format_settings)
{
const auto & base_url = format_settings.avro.schema_registry_url;
auto [schema_registry, loaded, _] = schema_registry_cache.getOrSet(
auto [schema_registry, loaded] = schema_registry_cache.getOrSet(
base_url,
[base_url]()
{

View File

@ -59,12 +59,12 @@ public:
MappedPtr getOrSet(const Key & key, LoadFunc && load)
{
auto result = Base::getOrSet(key, load);
if (result.cache_miss)
if (result.second)
ProfileEvents::increment(ProfileEvents::MarkCacheMisses);
else
ProfileEvents::increment(ProfileEvents::MarkCacheHits);
return result.value;
return result.first;
}
};