From 9fdb061b7e3f4d12518919d0c508b002c37e4db5 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 23 Dec 2021 17:36:41 +0800 Subject: [PATCH 01/28] Add evict policy in LRUCache --- src/Common/ErrorCodes.cpp | 2 + src/Common/LRUCache.h | 140 +++++++++++++++++++++++++++++++------- 2 files changed, 119 insertions(+), 23 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 70d85433513..07e05683340 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -604,6 +604,8 @@ M(633, QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW) \ M(634, MONGODB_ERROR) \ \ + M(656, CANNOT_RELEASE) \ + \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ M(1001, STD_EXCEPTION) \ diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index bbc09fd3aff..c503619a7d0 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -6,12 +6,16 @@ #include #include #include - #include +#include namespace DB { +namespace ErrorCodes +{ + extern const int CANNOT_RELEASE; +} template struct TrivialWeightFunction @@ -22,13 +26,32 @@ struct TrivialWeightFunction } }; +template +struct TrivialLRUCacheEvitPolicy +{ + // To note that the arg could be null + inline bool canRelease(std::shared_ptr) const + { + return true; + } + + // To note that the arg could be null + inline void release(std::shared_ptr) + { + } +}; + /// Thread-safe cache that evicts entries which are not used for a long time. /// WeightFunction is a functor that takes Mapped as a parameter and returns "weight" (approximate size) /// of that value. /// Cache starts to evict entries when their total weight exceeds max_size. /// Value weight should not change after insertion. -template , typename WeightFunction = TrivialWeightFunction> +template , + typename WeightFunction = TrivialWeightFunction, + typename EvictPolicy = TrivialLRUCacheEvitPolicy> class LRUCache { public: @@ -60,20 +83,41 @@ public: void set(const Key & key, const MappedPtr & mapped) { std::lock_guard lock(mutex); - setImpl(key, mapped, lock); } + /** + * trySet() will fail (return false) if there is no space left and no keys could be evicted. + * Eviction permission of each key is defined by EvictPolicy. In default policy there is no restriction. + */ + bool trySet(const Key & key, const MappedPtr & mapped) + { + std::lock_guard lock(mutex); + + return setImpl(key, mapped, lock); + } + + /// Returns std::pair of the cached value and a bool indicating whether the value was produced during this call. + template + std::pair getOrSet(const Key & key, LoadFunc && load_func) + { + auto [value, is_loaded, _] = getOrTrySet(key, std::move(load_func)); + return std::make_pair(value, is_loaded); + } + /// If the value for the key is in the cache, returns it. If it is not, calls load_func() to /// produce it, saves the result in the cache and returns it. - /// Only one of several concurrent threads calling getOrSet() will call load_func(), + /// Only one of several concurrent threads calling getOrTrySet() will call load_func(), /// others will wait for that call to complete and will use its result (this helps prevent cache stampede). /// Exceptions occurring in load_func will be propagated to the caller. Another thread from the /// set of concurrent threads will then try to call its load_func etc. /// - /// Returns std::pair of the cached value and a bool indicating whether the value was produced during this call. + /// return std::tuple is , where + /// - is_value_loaded indicates whether the value was produce during this call + /// - is_value_updated indicates whether the value is updated in the cache when is_value_loaded = true. + /// if is_value_loaded = false, is_value_updated = false template - std::pair getOrSet(const Key & key, LoadFunc && load_func) + std::tuple getOrTrySet(const Key &key, LoadFunc && load_func) { InsertTokenHolder token_holder; { @@ -83,7 +127,7 @@ public: if (val) { ++hits; - return std::make_pair(val, false); + return {val, false, false}; } auto & token = insert_tokens[key]; @@ -103,7 +147,7 @@ public: { /// Another thread already produced the value while we waited for token->mutex. ++hits; - return std::make_pair(token->value, false); + return {token->value, false, false}; } ++misses; @@ -113,18 +157,39 @@ public: /// Insert the new value only if the token is still in present in insert_tokens. /// (The token may be absent because of a concurrent reset() call). - bool result = false; + bool is_value_loaded = false; + bool is_value_updated = false; auto token_it = insert_tokens.find(key); if (token_it != insert_tokens.end() && token_it->second.get() == token) { - setImpl(key, token->value, cache_lock); - result = true; + // setImpl() may fail, but the final behavior seems not be affected + // next call of getOrTrySet() will still call load_func() + is_value_updated = setImpl(key, token->value, cache_lock); + is_value_loaded = true; } if (!token->cleaned_up) token_holder.cleanup(token_lock, cache_lock); - return std::make_pair(token->value, result); + return {token->value, is_value_loaded, is_value_updated}; + } + + /// If key is not in cache or the element can be released, return is true. otherwise, return is false + bool tryRemove(const Key & key) + { + std::lock_guard loc(mutex); + auto it = cells.find(key); + if (it == cells.end()) + return true; + auto & cell = it->second; + if (!evict_policy.canRelease(cell.value)) + return false; + evict_policy.release(cell.value); + + current_size -= cell.size; + cells.erase(it); + queue.erase(cell.queue_iterator); + return true; } void getStats(size_t & out_hits, size_t & out_misses) const @@ -259,6 +324,7 @@ private: std::atomic misses {0}; WeightFunction weight_function; + EvictPolicy evict_policy; MappedPtr getImpl(const Key & key, [[maybe_unused]] std::lock_guard & cache_lock) { @@ -276,7 +342,7 @@ private: return cell.value; } - void setImpl(const Key & key, const MappedPtr & mapped, [[maybe_unused]] std::lock_guard & cache_lock) + bool setImpl(const Key & key, const MappedPtr & mapped, [[maybe_unused]] std::lock_guard & cache_lock) { auto [it, inserted] = cells.emplace(std::piecewise_construct, std::forward_as_tuple(key), @@ -286,6 +352,15 @@ private: if (inserted) { + auto value_weight = mapped ? weight_function(*mapped) : 0; + // move removeOverflow() ahead here. In default, the final result is the same as the old implementation + if (!removeOverflow(value_weight)) + { + // cannot find enough space to put in the new value + cells.erase(it); + return false; + } + try { cell.queue_iterator = queue.insert(queue.end(), key); @@ -298,6 +373,13 @@ private: } else { + if (!evict_policy.canRelease(cell.value)) + { + // the old value is refered by someone, cannot release now + // in default policy, it is always true. + return false; + } + evict_policy.release(cell.value); // release the old value. this action is empty in default policy. current_size -= cell.size; queue.splice(queue.end(), queue, cell.queue_iterator); } @@ -306,17 +388,18 @@ private: cell.size = cell.value ? weight_function(*cell.value) : 0; current_size += cell.size; - removeOverflow(); + return true; } - void removeOverflow() + bool removeOverflow(size_t required_size_to_remove = 0) { size_t current_weight_lost = 0; size_t queue_size = cells.size(); - - while ((current_size > max_size || (max_elements_size != 0 && queue_size > max_elements_size)) && (queue_size > 1)) + auto key_it = queue.begin(); + auto is_overflow = [&] { return (current_size + required_size_to_remove > max_size || (max_elements_size != 0 && queue_size > max_elements_size)); }; + while (is_overflow() && (queue_size > 1) && (key_it != queue.end())) { - const Key & key = queue.front(); + const Key & key = *key_it; auto it = cells.find(key); if (it == cells.end()) @@ -326,13 +409,23 @@ private: } const auto & cell = it->second; + if (evict_policy.canRelease(cell.value))// in default, it is true + { + // always call release() before erasing an element + // in default, it's an empty action + evict_policy.release(cell.value); - current_size -= cell.size; - current_weight_lost += cell.size; + current_size -= cell.size; + current_weight_lost += cell.size; - cells.erase(it); - queue.pop_front(); - --queue_size; + cells.erase(it); + key_it = queue.erase(key_it); + --queue_size; + } + else + { + key_it++; + } } onRemoveOverflowWeightLoss(current_weight_lost); @@ -342,6 +435,7 @@ private: LOG_ERROR(&Poco::Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it."); abort(); } + return !is_overflow(); } /// Override this method if you want to track how much weight was lost in removeOverflow method. From 59bc87e4098dd020b9a7bada7917e28bbde73ee4 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 23 Dec 2021 17:53:19 +0800 Subject: [PATCH 02/28] remove unused errorcode --- src/Common/ErrorCodes.cpp | 2 -- src/Common/LRUCache.h | 5 ----- 2 files changed, 7 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 07e05683340..70d85433513 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -604,8 +604,6 @@ M(633, QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW) \ M(634, MONGODB_ERROR) \ \ - M(656, CANNOT_RELEASE) \ - \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ M(1001, STD_EXCEPTION) \ diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index c503619a7d0..4919bbffc32 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -12,11 +12,6 @@ namespace DB { -namespace ErrorCodes -{ - extern const int CANNOT_RELEASE; -} - template struct TrivialWeightFunction { From fe4ef69fc7108bd63fa23ae76f75e23a5f50630a Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 23 Dec 2021 18:01:58 +0800 Subject: [PATCH 03/28] remove unused header --- src/Common/LRUCache.h | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index 4919bbffc32..c8a5dad5731 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -7,7 +7,6 @@ #include #include #include -#include namespace DB From e25b1f7361a4c67034fb63c224eda8795f3f4e21 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 23 Dec 2021 18:05:52 +0800 Subject: [PATCH 04/28] update comments --- src/Common/LRUCache.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index c8a5dad5731..b6cacc8f7ee 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -23,13 +23,11 @@ struct TrivialWeightFunction template struct TrivialLRUCacheEvitPolicy { - // To note that the arg could be null inline bool canRelease(std::shared_ptr) const { return true; } - // To note that the arg could be null inline void release(std::shared_ptr) { } From 5a9bd7fd41d599e1d04adb9760bd6fd38a8f211b Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 23 Dec 2021 18:13:56 +0800 Subject: [PATCH 05/28] fixe typos --- src/Common/LRUCache.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index b6cacc8f7ee..856a230674b 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -367,7 +367,7 @@ private: { if (!evict_policy.canRelease(cell.value)) { - // the old value is refered by someone, cannot release now + // the old value is referred by someone, cannot release now // in default policy, it is always true. return false; } From 51e43ae6194411a413fc48b5aed9a4c7127c02a3 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Mon, 27 Dec 2021 10:20:07 +0800 Subject: [PATCH 06/28] some modification in LRUCache --- src/Common/LRUCache.h | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index 856a230674b..2ca5e007716 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -21,7 +21,7 @@ struct TrivialWeightFunction }; template -struct TrivialLRUCacheEvitPolicy +struct TrivialLRUCacheEvictPolicy { inline bool canRelease(std::shared_ptr) const { @@ -43,13 +43,21 @@ template , typename WeightFunction = TrivialWeightFunction, - typename EvictPolicy = TrivialLRUCacheEvitPolicy> + typename EvictPolicy = TrivialLRUCacheEvictPolicy> class LRUCache { public: using Key = TKey; using Mapped = TMapped; using MappedPtr = std::shared_ptr; + + struct Result + { + MappedPtr value; + bool cache_miss = true; + // set_successful is not trustworthy for getOrSet, because removeOverflow is called right after putting key in cache + bool set_successful = false; + }; /** Initialize LRUCache with max_size and max_elements_size. * max_elements_size == 0 means no elements size restrictions. @@ -89,12 +97,11 @@ public: return setImpl(key, mapped, lock); } - /// Returns std::pair of the cached value and a bool indicating whether the value was produced during this call. template std::pair getOrSet(const Key & key, LoadFunc && load_func) { - auto [value, is_loaded, _] = getOrTrySet(key, std::move(load_func)); - return std::make_pair(value, is_loaded); + auto result = getOrTrySet(key, std::move(load_func)); + return std::make_pair(result.value, result.cache_miss); } /// If the value for the key is in the cache, returns it. If it is not, calls load_func() to @@ -104,12 +111,8 @@ public: /// Exceptions occurring in load_func will be propagated to the caller. Another thread from the /// set of concurrent threads will then try to call its load_func etc. /// - /// return std::tuple is , where - /// - is_value_loaded indicates whether the value was produce during this call - /// - is_value_updated indicates whether the value is updated in the cache when is_value_loaded = true. - /// if is_value_loaded = false, is_value_updated = false template - std::tuple getOrTrySet(const Key &key, LoadFunc && load_func) + Result getOrTrySet(const Key &key, LoadFunc && load_func) { InsertTokenHolder token_holder; { @@ -345,7 +348,6 @@ private: if (inserted) { auto value_weight = mapped ? weight_function(*mapped) : 0; - // move removeOverflow() ahead here. In default, the final result is the same as the old implementation if (!removeOverflow(value_weight)) { // cannot find enough space to put in the new value From ef1d7142f52f88a0d5a996ba0b669de0579d79f3 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Mon, 27 Dec 2021 16:12:39 +0800 Subject: [PATCH 07/28] remove getOrTrySet --- src/Common/LRUCache.h | 15 +++++---------- src/IO/MMappedFileCache.h | 4 ++-- src/IO/UncompressedCache.h | 4 ++-- src/Interpreters/Aggregator.cpp | 4 ++-- src/Interpreters/ExpressionJIT.cpp | 3 ++- src/Interpreters/MergeJoin.cpp | 2 +- .../Formats/Impl/AvroRowInputFormat.cpp | 4 ++-- src/Storages/MarkCache.h | 4 ++-- 8 files changed, 18 insertions(+), 22 deletions(-) diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index 2ca5e007716..f782812b04d 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -50,12 +50,14 @@ public: using Key = TKey; using Mapped = TMapped; using MappedPtr = std::shared_ptr; - + struct Result { MappedPtr value; + // if key is in cache, cache_miss is true bool cache_miss = true; - // set_successful is not trustworthy for getOrSet, because removeOverflow is called right after putting key in cache + // set_successful is false in default + // when value is loaded by load_fun in getOrSet(), and setImpl returns true, set_successful = true bool set_successful = false; }; @@ -97,13 +99,6 @@ public: return setImpl(key, mapped, lock); } - template - std::pair getOrSet(const Key & key, LoadFunc && load_func) - { - auto result = getOrTrySet(key, std::move(load_func)); - return std::make_pair(result.value, result.cache_miss); - } - /// If the value for the key is in the cache, returns it. If it is not, calls load_func() to /// produce it, saves the result in the cache and returns it. /// Only one of several concurrent threads calling getOrTrySet() will call load_func(), @@ -112,7 +107,7 @@ public: /// set of concurrent threads will then try to call its load_func etc. /// template - Result getOrTrySet(const Key &key, LoadFunc && load_func) + Result getOrSet(const Key &key, LoadFunc && load_func) { InsertTokenHolder token_holder; { diff --git a/src/IO/MMappedFileCache.h b/src/IO/MMappedFileCache.h index adbb85a18cf..7ee6957c7db 100644 --- a/src/IO/MMappedFileCache.h +++ b/src/IO/MMappedFileCache.h @@ -48,12 +48,12 @@ public: MappedPtr getOrSet(const Key & key, LoadFunc && load) { auto result = Base::getOrSet(key, load); - if (result.second) + if (result.cache_miss) ProfileEvents::increment(ProfileEvents::MMappedFileCacheMisses); else ProfileEvents::increment(ProfileEvents::MMappedFileCacheHits); - return result.first; + return result.value; } }; diff --git a/src/IO/UncompressedCache.h b/src/IO/UncompressedCache.h index 5826b7f020a..78f81c15a4a 100644 --- a/src/IO/UncompressedCache.h +++ b/src/IO/UncompressedCache.h @@ -63,12 +63,12 @@ public: { auto result = Base::getOrSet(key, std::forward(load)); - if (result.second) + if (result.cache_miss) ProfileEvents::increment(ProfileEvents::UncompressedCacheMisses); else ProfileEvents::increment(ProfileEvents::UncompressedCacheHits); - return result.first; + return result.value; } private: diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index ae5ce117c61..95341efa76a 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -354,14 +354,14 @@ void Aggregator::compileAggregateFunctionsIfNeeded() if (auto * compilation_cache = CompiledExpressionCacheFactory::instance().tryGetCache()) { - auto [compiled_function_cache_entry, _] = compilation_cache->getOrSet(aggregate_functions_description_hash_key, [&] () + auto result = compilation_cache->getOrSet(aggregate_functions_description_hash_key, [&] () { LOG_TRACE(log, "Compile expression {}", functions_description); auto compiled_aggregate_functions = compileAggregateFunctions(getJITInstance(), functions_to_compile, functions_description); return std::make_shared(std::move(compiled_aggregate_functions)); }); - + auto compiled_function_cache_entry = result.value; compiled_aggregate_functions_holder = std::static_pointer_cast(compiled_function_cache_entry); } else diff --git a/src/Interpreters/ExpressionJIT.cpp b/src/Interpreters/ExpressionJIT.cpp index 90292d17fae..d5017b18dc1 100644 --- a/src/Interpreters/ExpressionJIT.cpp +++ b/src/Interpreters/ExpressionJIT.cpp @@ -296,12 +296,13 @@ static FunctionBasePtr compile( if (auto * compilation_cache = CompiledExpressionCacheFactory::instance().tryGetCache()) { - auto [compiled_function_cache_entry, _] = compilation_cache->getOrSet(hash_key, [&] () + auto result = compilation_cache->getOrSet(hash_key, [&] () { LOG_TRACE(getLogger(), "Compile expression {}", llvm_function->getName()); auto compiled_function = compileFunction(getJITInstance(), *llvm_function); return std::make_shared(compiled_function); }); + auto compiled_function_cache_entry = result.value; std::shared_ptr compiled_function_holder = std::static_pointer_cast(compiled_function_cache_entry); llvm_function->setCompiledFunction(std::move(compiled_function_holder)); diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index 7f22386f54b..d5ea1682dff 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -1033,7 +1033,7 @@ std::shared_ptr MergeJoin::loadRightBlock(size_t pos) const return std::make_shared(input.block_in->read()); }; - return cached_right_blocks->getOrSet(pos, load_func).first; + return cached_right_blocks->getOrSet(pos, load_func).value; } else return loaded_right_blocks[pos]; diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index 11e56ecbe0c..c415f27d202 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -650,7 +650,7 @@ public: avro::ValidSchema getSchema(uint32_t id) { - auto [schema, loaded] = schema_cache.getOrSet( + auto [schema, loaded, _] = schema_cache.getOrSet( id, [this, id](){ return std::make_shared(fetchSchema(id)); } ); @@ -727,7 +727,7 @@ static LRUCache schema_registry_cache(SCH static std::shared_ptr getConfluentSchemaRegistry(const FormatSettings & format_settings) { const auto & base_url = format_settings.avro.schema_registry_url; - auto [schema_registry, loaded] = schema_registry_cache.getOrSet( + auto [schema_registry, loaded, _] = schema_registry_cache.getOrSet( base_url, [base_url]() { diff --git a/src/Storages/MarkCache.h b/src/Storages/MarkCache.h index 06143e954f8..3438b4a1b9b 100644 --- a/src/Storages/MarkCache.h +++ b/src/Storages/MarkCache.h @@ -59,12 +59,12 @@ public: MappedPtr getOrSet(const Key & key, LoadFunc && load) { auto result = Base::getOrSet(key, load); - if (result.second) + if (result.cache_miss) ProfileEvents::increment(ProfileEvents::MarkCacheMisses); else ProfileEvents::increment(ProfileEvents::MarkCacheHits); - return result.first; + return result.value; } }; From 3367755dcc71ea1c0d4159193fb0ef00b4989ff6 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Mon, 27 Dec 2021 16:21:10 +0800 Subject: [PATCH 08/28] update comment --- src/Common/LRUCache.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index f782812b04d..049d07c74f4 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -101,7 +101,7 @@ public: /// If the value for the key is in the cache, returns it. If it is not, calls load_func() to /// produce it, saves the result in the cache and returns it. - /// Only one of several concurrent threads calling getOrTrySet() will call load_func(), + /// Only one of several concurrent threads calling getOrSet() will call load_func(), /// others will wait for that call to complete and will use its result (this helps prevent cache stampede). /// Exceptions occurring in load_func will be propagated to the caller. Another thread from the /// set of concurrent threads will then try to call its load_func etc. From 54b2a42530c687068e33428dd6ee713296a9ee98 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Mon, 27 Dec 2021 18:02:02 +0800 Subject: [PATCH 09/28] update variable names --- src/Common/LRUCache.h | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index 049d07c74f4..3f13b4352c3 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -148,20 +148,20 @@ public: /// Insert the new value only if the token is still in present in insert_tokens. /// (The token may be absent because of a concurrent reset() call). bool is_value_loaded = false; - bool is_value_updated = false; + bool is_value_loaded_and_set = false; auto token_it = insert_tokens.find(key); if (token_it != insert_tokens.end() && token_it->second.get() == token) { // setImpl() may fail, but the final behavior seems not be affected // next call of getOrTrySet() will still call load_func() - is_value_updated = setImpl(key, token->value, cache_lock); + is_value_loaded_and_set = setImpl(key, token->value, cache_lock); is_value_loaded = true; } if (!token->cleaned_up) token_holder.cleanup(token_lock, cache_lock); - return {token->value, is_value_loaded, is_value_updated}; + return {token->value, is_value_loaded, is_value_loaded_and_set}; } /// If key is not in cache or the element can be released, return is true. otherwise, return is false @@ -363,11 +363,7 @@ private: else { if (!evict_policy.canRelease(cell.value)) - { - // the old value is referred by someone, cannot release now - // in default policy, it is always true. return false; - } evict_policy.release(cell.value); // release the old value. this action is empty in default policy. current_size -= cell.size; queue.splice(queue.end(), queue, cell.queue_iterator); @@ -377,6 +373,7 @@ private: cell.size = cell.value ? weight_function(*cell.value) : 0; current_size += cell.size; + removeOverflow(); return true; } From f86e3ef1ae24d2fbb5179900136d53ea17c60eb7 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Mon, 27 Dec 2021 18:12:40 +0800 Subject: [PATCH 10/28] update the evict action when overwrite a key --- src/Common/LRUCache.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index 3f13b4352c3..700ef3b0ceb 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -339,10 +339,10 @@ private: std::forward_as_tuple()); Cell & cell = it->second; + auto value_weight = mapped ? weight_function(*mapped) : 0; if (inserted) { - auto value_weight = mapped ? weight_function(*mapped) : 0; if (!removeOverflow(value_weight)) { // cannot find enough space to put in the new value @@ -364,6 +364,8 @@ private: { if (!evict_policy.canRelease(cell.value)) return false; + if (value_weight > cell.size && !removeOverflow(value_weight - cell.size)) + return false; evict_policy.release(cell.value); // release the old value. this action is empty in default policy. current_size -= cell.size; queue.splice(queue.end(), queue, cell.queue_iterator); @@ -373,7 +375,6 @@ private: cell.size = cell.value ? weight_function(*cell.value) : 0; current_size += cell.size; - removeOverflow(); return true; } From 922dd40c05b8295303a3b7b9d981363018b2f057 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Mon, 27 Dec 2021 20:33:25 +0800 Subject: [PATCH 11/28] optimization --- src/Common/LRUCache.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index 700ef3b0ceb..a5c4990fecd 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -372,7 +372,7 @@ private: } cell.value = mapped; - cell.size = cell.value ? weight_function(*cell.value) : 0; + cell.size = value_weight; current_size += cell.size; return true; From 26e61061b0cf125cae246e981e2bf34dfa777da1 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Mon, 27 Dec 2021 20:46:51 +0800 Subject: [PATCH 12/28] update setImpl() --- src/Common/LRUCache.h | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index a5c4990fecd..652b02cad64 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -343,13 +343,6 @@ private: if (inserted) { - if (!removeOverflow(value_weight)) - { - // cannot find enough space to put in the new value - cells.erase(it); - return false; - } - try { cell.queue_iterator = queue.insert(queue.end(), key); @@ -359,6 +352,14 @@ private: cells.erase(it); throw; } + + if (!removeOverflow()) + { + // overflow is caused by inserting this element. + queue.erase(cell.queue_iterator); + cells.erase(it); + return false; + } } else { From 413fb290bbab73923e858ab3da94fa359f455436 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Tue, 28 Dec 2021 09:19:21 +0800 Subject: [PATCH 13/28] remove comments --- src/Common/LRUCache.h | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index 652b02cad64..97d70b5598c 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -152,8 +152,6 @@ public: auto token_it = insert_tokens.find(key); if (token_it != insert_tokens.end() && token_it->second.get() == token) { - // setImpl() may fail, but the final behavior seems not be affected - // next call of getOrTrySet() will still call load_func() is_value_loaded_and_set = setImpl(key, token->value, cache_lock); is_value_loaded = true; } From 59cbd768800614227fa478e1e09c3bb092d69fee Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 29 Dec 2021 15:25:33 +0800 Subject: [PATCH 14/28] Add LRUResourceCache 1. add LRUResourceCache for managing resource cache in lru policy 2. rollback LRUCache to the original version 3. add remove() in LRUCache 4. add unit tests for LRUResourceCache and LRUCache --- src/Common/LRUCache.h | 131 +++------ src/Common/LRUResourceCache.h | 254 ++++++++++++++++++ src/Common/tests/gtest_lru_cache.cpp | 102 +++++++ src/Common/tests/gtest_lru_resource_cache.cpp | 212 +++++++++++++++ src/IO/MMappedFileCache.h | 4 +- src/IO/UncompressedCache.h | 4 +- src/Interpreters/Aggregator.cpp | 3 +- src/Interpreters/ExpressionJIT.cpp | 3 +- src/Interpreters/MergeJoin.cpp | 2 +- .../Formats/Impl/AvroRowInputFormat.cpp | 4 +- src/Storages/MarkCache.h | 4 +- 11 files changed, 610 insertions(+), 113 deletions(-) create mode 100644 src/Common/LRUResourceCache.h create mode 100644 src/Common/tests/gtest_lru_cache.cpp create mode 100644 src/Common/tests/gtest_lru_resource_cache.cpp diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index 97d70b5598c..1058003a327 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -6,11 +6,13 @@ #include #include #include + #include namespace DB { + template struct TrivialWeightFunction { @@ -20,30 +22,13 @@ struct TrivialWeightFunction } }; -template -struct TrivialLRUCacheEvictPolicy -{ - inline bool canRelease(std::shared_ptr) const - { - return true; - } - - inline void release(std::shared_ptr) - { - } -}; - /// Thread-safe cache that evicts entries which are not used for a long time. /// WeightFunction is a functor that takes Mapped as a parameter and returns "weight" (approximate size) /// of that value. /// Cache starts to evict entries when their total weight exceeds max_size. /// Value weight should not change after insertion. -template , - typename WeightFunction = TrivialWeightFunction, - typename EvictPolicy = TrivialLRUCacheEvictPolicy> +template , typename WeightFunction = TrivialWeightFunction> class LRUCache { public: @@ -51,16 +36,6 @@ public: using Mapped = TMapped; using MappedPtr = std::shared_ptr; - struct Result - { - MappedPtr value; - // if key is in cache, cache_miss is true - bool cache_miss = true; - // set_successful is false in default - // when value is loaded by load_fun in getOrSet(), and setImpl returns true, set_successful = true - bool set_successful = false; - }; - /** Initialize LRUCache with max_size and max_elements_size. * max_elements_size == 0 means no elements size restrictions. */ @@ -85,18 +60,19 @@ public: void set(const Key & key, const MappedPtr & mapped) { std::lock_guard lock(mutex); + setImpl(key, mapped, lock); } - /** - * trySet() will fail (return false) if there is no space left and no keys could be evicted. - * Eviction permission of each key is defined by EvictPolicy. In default policy there is no restriction. - */ - bool trySet(const Key & key, const MappedPtr & mapped) + void remove(const Key & key) { std::lock_guard lock(mutex); - - return setImpl(key, mapped, lock); + auto it = cells.find(key); + if (it == cells.end()) + return; + auto & cell = it->second; + current_size -= cell.size; + cells.erase(it); } /// If the value for the key is in the cache, returns it. If it is not, calls load_func() to @@ -106,8 +82,9 @@ public: /// Exceptions occurring in load_func will be propagated to the caller. Another thread from the /// set of concurrent threads will then try to call its load_func etc. /// + /// Returns std::pair of the cached value and a bool indicating whether the value was produced during this call. template - Result getOrSet(const Key &key, LoadFunc && load_func) + std::pair getOrSet(const Key & key, LoadFunc && load_func) { InsertTokenHolder token_holder; { @@ -117,7 +94,7 @@ public: if (val) { ++hits; - return {val, false, false}; + return std::make_pair(val, false); } auto & token = insert_tokens[key]; @@ -137,7 +114,7 @@ public: { /// Another thread already produced the value while we waited for token->mutex. ++hits; - return {token->value, false, false}; + return std::make_pair(token->value, false); } ++misses; @@ -147,37 +124,18 @@ public: /// Insert the new value only if the token is still in present in insert_tokens. /// (The token may be absent because of a concurrent reset() call). - bool is_value_loaded = false; - bool is_value_loaded_and_set = false; + bool result = false; auto token_it = insert_tokens.find(key); if (token_it != insert_tokens.end() && token_it->second.get() == token) { - is_value_loaded_and_set = setImpl(key, token->value, cache_lock); - is_value_loaded = true; + setImpl(key, token->value, cache_lock); + result = true; } if (!token->cleaned_up) token_holder.cleanup(token_lock, cache_lock); - return {token->value, is_value_loaded, is_value_loaded_and_set}; - } - - /// If key is not in cache or the element can be released, return is true. otherwise, return is false - bool tryRemove(const Key & key) - { - std::lock_guard loc(mutex); - auto it = cells.find(key); - if (it == cells.end()) - return true; - auto & cell = it->second; - if (!evict_policy.canRelease(cell.value)) - return false; - evict_policy.release(cell.value); - - current_size -= cell.size; - cells.erase(it); - queue.erase(cell.queue_iterator); - return true; + return std::make_pair(token->value, result); } void getStats(size_t & out_hits, size_t & out_misses) const @@ -312,7 +270,6 @@ private: std::atomic misses {0}; WeightFunction weight_function; - EvictPolicy evict_policy; MappedPtr getImpl(const Key & key, [[maybe_unused]] std::lock_guard & cache_lock) { @@ -330,14 +287,13 @@ private: return cell.value; } - bool setImpl(const Key & key, const MappedPtr & mapped, [[maybe_unused]] std::lock_guard & cache_lock) + void setImpl(const Key & key, const MappedPtr & mapped, [[maybe_unused]] std::lock_guard & cache_lock) { auto [it, inserted] = cells.emplace(std::piecewise_construct, std::forward_as_tuple(key), std::forward_as_tuple()); Cell & cell = it->second; - auto value_weight = mapped ? weight_function(*mapped) : 0; if (inserted) { @@ -350,42 +306,28 @@ private: cells.erase(it); throw; } - - if (!removeOverflow()) - { - // overflow is caused by inserting this element. - queue.erase(cell.queue_iterator); - cells.erase(it); - return false; - } } else { - if (!evict_policy.canRelease(cell.value)) - return false; - if (value_weight > cell.size && !removeOverflow(value_weight - cell.size)) - return false; - evict_policy.release(cell.value); // release the old value. this action is empty in default policy. current_size -= cell.size; queue.splice(queue.end(), queue, cell.queue_iterator); } cell.value = mapped; - cell.size = value_weight; + cell.size = cell.value ? weight_function(*cell.value) : 0; current_size += cell.size; - return true; + removeOverflow(); } - bool removeOverflow(size_t required_size_to_remove = 0) + void removeOverflow() { size_t current_weight_lost = 0; size_t queue_size = cells.size(); - auto key_it = queue.begin(); - auto is_overflow = [&] { return (current_size + required_size_to_remove > max_size || (max_elements_size != 0 && queue_size > max_elements_size)); }; - while (is_overflow() && (queue_size > 1) && (key_it != queue.end())) + + while ((current_size > max_size || (max_elements_size != 0 && queue_size > max_elements_size)) && (queue_size > 1)) { - const Key & key = *key_it; + const Key & key = queue.front(); auto it = cells.find(key); if (it == cells.end()) @@ -395,23 +337,13 @@ private: } const auto & cell = it->second; - if (evict_policy.canRelease(cell.value))// in default, it is true - { - // always call release() before erasing an element - // in default, it's an empty action - evict_policy.release(cell.value); - current_size -= cell.size; - current_weight_lost += cell.size; + current_size -= cell.size; + current_weight_lost += cell.size; - cells.erase(it); - key_it = queue.erase(key_it); - --queue_size; - } - else - { - key_it++; - } + cells.erase(it); + queue.pop_front(); + --queue_size; } onRemoveOverflowWeightLoss(current_weight_lost); @@ -421,7 +353,6 @@ private: LOG_ERROR(&Poco::Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it."); abort(); } - return !is_overflow(); } /// Override this method if you want to track how much weight was lost in removeOverflow method. diff --git a/src/Common/LRUResourceCache.h b/src/Common/LRUResourceCache.h new file mode 100644 index 00000000000..7250b44a055 --- /dev/null +++ b/src/Common/LRUResourceCache.h @@ -0,0 +1,254 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +template +struct TrivailLRUResourceCacheWeightFunction +{ + size_t operator()(const T &) const { return 1; } +}; + +/* + * A resource cache with key index. There is only one instance for every key which is not like the normal resource pool. + * Resource cache has max weight capacity and keys size limitation. If the limitation is exceeded, keys would be evicted + * by LRU policy. + * + * acquire and release must be used in pair. + */ +template < + typename TKey, + typename TMapped, + typename WeightFunction = TrivailLRUResourceCacheWeightFunction, + typename HashFunction = std::hash> +class LRUResourceCache +{ +public: + LRUResourceCache(size_t max_weight_, size_t max_element_size_ = 0) : max_weight(max_weight_), max_element_size(max_element_size_) { } + virtual ~LRUResourceCache() { } + using Key = TKey; + using Mapped = TMapped; + using MappedPtr = std::shared_ptr; + + // - load_func : when key is not exists in cache, load_func is called to generate a new key + // - return: is null when there is no more space for the new value or the old value is in used. + template + MappedPtr acquire(const Key & key, LoadFunc && load_func) + { + InsertToken * insert_token = nullptr; + { + std::lock_guard lock(mutex); + auto it = cells.find(key); + if (it != cells.end()) + { + hits++; + it->second.reference_count += 1; + return it->second.value; + } + misses++; + insert_token = acquireInsertToken(key); + } + Cell * cell_ptr = nullptr; + { + std::lock_guard lock(insert_token->mutex); + if (!insert_token->value) + { + insert_token->value = load_func(); + std::lock_guard cell_lock(mutex); + cell_ptr = insert_value(key, insert_token->value); + if (cell_ptr) + { + cell_ptr->reference_count += 1; + } + else + { + insert_token->value = nullptr; + } + } + } + + std::lock_guard lock(mutex); + releaseInsertToken(key); + if (cell_ptr) + { + return cell_ptr->value; + } + return nullptr; + } + + MappedPtr acquire(const Key & key) + { + std::lock_guard lock(mutex); + auto it = cells.find(key); + if (it == cells.end()) + { + misses++; + return nullptr; + } + hits++; + it->second.reference_count += 1; + queue.splice(queue.end(), queue, it->second.queue_iterator); + return it->second.value; + } + + // mark a reference is released + void release(const Key & key) + { + std::lock_guard lock(mutex); + auto it = cells.find(key); + if (it == cells.end() || it->second.reference_count == 0) + { + LOG_ERROR(&Poco::Logger::get("LRUResourceCache"), "try to release an invalid element"); + abort(); + } + it->second.reference_count -= 1; + } + + // If you want to update a value, call tryRemove() at first and then call acquire() with load_func. + bool tryRemove(const Key & key) + { + std::lock_guard guard(mutex); + auto it = cells.find(key); + if (it == cells.end()) + return true; + auto & cell = it->second; + if (cell.reference_count) + return false; + queue.erase(cell.queue_iterator); + current_weight -= cell.weight; + cells.erase(it); + return true; + } + + size_t weight() + { + std::lock_guard lock(mutex); + return current_weight; + } + + size_t size() + { + std::lock_guard lock(mutex); + return cells.size(); + } + + void getStats(size_t & out_hits, size_t & out_misses, size_t & out_evict_count) const + { + out_hits = hits; + out_misses = misses; + out_evict_count = evict_count; + } + +private: + mutable std::mutex mutex; + + using LRUQueue = std::list; + using LRUQueueIterator = typename LRUQueue::iterator; + + struct Cell + { + MappedPtr value; + size_t weight = 0; + LRUQueueIterator queue_iterator; + size_t reference_count = 0; + }; + + using Cells = std::unordered_map; + Cells cells; + LRUQueue queue; + size_t current_weight = 0; + size_t max_weight = 0; + size_t max_element_size = 0; + + struct InsertToken + { + std::mutex mutex; + MappedPtr value; + size_t reference_count = 0; + }; + using InsertTokens = std::unordered_map; + InsertTokens insert_tokens; + WeightFunction weight_function; + std::atomic hits{0}; + std::atomic misses{0}; + std::atomic evict_count{0}; + + InsertToken * acquireInsertToken(const Key & key) + { + auto & token = insert_tokens[key]; + token.reference_count += 1; + return &token; + } + + void releaseInsertToken(const Key & key) + { + auto it = insert_tokens.find(key); + if (it != insert_tokens.end()) + { + it->second.reference_count -= 1; + if (it->second.reference_count == 0) + insert_tokens.erase(it); + } + } + + // key mustn't be in the cache + Cell * insert_value(const Key & insert_key, MappedPtr value) + { + auto weight = value ? weight_function(*value) : 0; + auto queue_size = cells.size() + 1; + auto loss_weight = 0; + auto is_overflow = [&] { + return current_weight + weight - loss_weight > max_weight || (max_element_size != 0 && queue_size > max_element_size); + }; + auto key_it = queue.begin(); + std::unordered_set to_release_keys; + while (is_overflow() && queue_size > 1 && key_it != queue.end()) + { + const Key & key = *key_it; + auto cell_it = cells.find(key); + if (cell_it == cells.end()) + { + LOG_ERROR(&Poco::Logger::get("LRUResourceCache"), "LRUResourceCache became inconsistent. There must be a bug in it."); + abort(); + } + auto & cell = cell_it->second; + if (cell.reference_count == 0) + { + loss_weight += cell.weight; + queue_size -= 1; + to_release_keys.insert(key); + } + key_it++; + } + if (is_overflow()) + return nullptr; + if (loss_weight > current_weight + weight) + { + LOG_ERROR(&Poco::Logger::get("LRUResourceCache"), "LRUResourceCache became inconsistent. There must be a bug in it."); + abort(); + } + for (auto & key : to_release_keys) + { + auto & cell = cells[key]; + queue.erase(cell.queue_iterator); + cells.erase(key); + evict_count++; + } + current_weight = current_weight + weight - loss_weight; + + auto & new_cell = cells[insert_key]; + new_cell.value = value; + new_cell.weight = weight; + new_cell.queue_iterator = queue.insert(queue.end(), insert_key); + return &new_cell; + } +}; +} diff --git a/src/Common/tests/gtest_lru_cache.cpp b/src/Common/tests/gtest_lru_cache.cpp new file mode 100644 index 00000000000..42e404de379 --- /dev/null +++ b/src/Common/tests/gtest_lru_cache.cpp @@ -0,0 +1,102 @@ +#include +#include +#include +#include + +TEST(LRUCache, set) +{ + using SimpleLRUCache =DB::LRUCache; + auto lru_cache = SimpleLRUCache(10, 10); + lru_cache.set(1, std::make_shared(2)); + lru_cache.set(2,std::make_shared(3)); + + auto w = lru_cache.weight(); + auto n = lru_cache.count(); + ASSERT_EQ(w, 2); + ASSERT_EQ(n, 2); +} + +TEST(LRUCache, update) +{ + using SimpleLRUCache =DB::LRUCache; + auto lru_cache = SimpleLRUCache(10, 10); + lru_cache.set(1, std::make_shared(2)); + lru_cache.set(1,std::make_shared(3)); + auto val = lru_cache.get(1); + ASSERT_TRUE(val != nullptr); + ASSERT_TRUE(*val == 3); +} + +TEST(LRUCache, get) +{ + using SimpleLRUCache =DB::LRUCache; + auto lru_cache = SimpleLRUCache(10, 10); + lru_cache.set(1, std::make_shared(2)); + lru_cache.set(2, std::make_shared(3)); + SimpleLRUCache::MappedPtr value = lru_cache.get(1); + ASSERT_TRUE(value != nullptr); + ASSERT_EQ(*value, 2); + + value = lru_cache.get(2); + ASSERT_TRUE(value != nullptr); + ASSERT_EQ(*value, 3); +} + +struct ValueWeight +{ + size_t operator()(const size_t & x) const + { + return x; + } +}; + +TEST(LRUCache, evict_on_size) +{ + + using SimpleLRUCache =DB::LRUCache; + auto lru_cache = SimpleLRUCache(20, 3); + lru_cache.set(1, std::make_shared(2)); + lru_cache.set(2, std::make_shared(3)); + lru_cache.set(3, std::make_shared(4)); + lru_cache.set(4, std::make_shared(5)); + + auto n = lru_cache.count(); + ASSERT_EQ(n, 3); + + auto value = lru_cache.get(1); + ASSERT_TRUE(value == nullptr); +} + +TEST(LRUCache, evict_on_weight) +{ + + using SimpleLRUCache =DB::LRUCache, ValueWeight>; + auto lru_cache = SimpleLRUCache(10, 10); + lru_cache.set(1, std::make_shared(2)); + lru_cache.set(2, std::make_shared(3)); + lru_cache.set(3, std::make_shared(4)); + lru_cache.set(4, std::make_shared(5)); + + auto n = lru_cache.count(); + ASSERT_EQ(n, 2); + + auto w = lru_cache.weight(); + ASSERT_EQ(w, 9); + + auto value = lru_cache.get(1); + ASSERT_TRUE(value == nullptr); + value = lru_cache.get(2); + ASSERT_TRUE(value == nullptr); +} + +TEST(LRUCache, getOrSet) +{ + using SimpleLRUCache =DB::LRUCache, ValueWeight>; + auto lru_cache = SimpleLRUCache(10, 10); + size_t x = 10; + auto load_func = [&]{ return std::make_shared(x); }; + auto [value, loaded] = lru_cache.getOrSet(1, load_func); + ASSERT_TRUE(value != nullptr); + ASSERT_TRUE(*value == 10); +} + diff --git a/src/Common/tests/gtest_lru_resource_cache.cpp b/src/Common/tests/gtest_lru_resource_cache.cpp new file mode 100644 index 00000000000..79317d83936 --- /dev/null +++ b/src/Common/tests/gtest_lru_resource_cache.cpp @@ -0,0 +1,212 @@ +#include +#include +#include +#include + +TEST(LRUResourceCache, acquire) +{ + using MyCache = DB::LRUResourceCache; + auto mcache = MyCache(10, 10); + int x = 10; + auto load_int = [&] { return std::make_shared(x); }; + auto val = mcache.acquire(1, load_int); + x = 11; + val = mcache.acquire(2, load_int); + ASSERT_TRUE(val != nullptr); + ASSERT_TRUE(*val == 11); + + val = mcache.acquire(1); + ASSERT_TRUE(val != nullptr); + ASSERT_TRUE(*val == 10); +} + +TEST(LRUResourceCache, remove) +{ + using MyCache = DB::LRUResourceCache; + auto mcache = MyCache(10, 10); + int x = 10; + auto load_int = [&] { return std::make_shared(x); }; + auto val = mcache.acquire(1, load_int); + x = 11; + val = mcache.acquire(2, load_int); + + auto succ = mcache.tryRemove(3); + ASSERT_TRUE(succ); + + succ = mcache.tryRemove(1); + ASSERT_TRUE(!succ); + val = mcache.acquire(1); + ASSERT_TRUE(val != nullptr); + ASSERT_TRUE(*val == 10); + + mcache.release(1); + succ = mcache.tryRemove(1); + ASSERT_TRUE(!succ); + mcache.release(1); + succ = mcache.tryRemove(1); + ASSERT_TRUE(succ); + val = mcache.acquire(1); + ASSERT_TRUE(val == nullptr); +} + +struct MyWeight +{ + size_t operator()(const int & x) const { return static_cast(x); } +}; + +TEST(LRUResourceCache, evict_on_weight) +{ + using MyCache = DB::LRUResourceCache; + auto mcache = MyCache(5, 10); + int x = 2; + auto load_int = [&] { return std::make_shared(x); }; + auto val = mcache.acquire(1, load_int); + mcache.release(1); + + val = mcache.acquire(2, load_int); + mcache.release(2); + + x = 3; + val = mcache.acquire(3, load_int); + ASSERT_TRUE(val != nullptr); + + auto w = mcache.weight(); + ASSERT_EQ(w, 5); + auto n = mcache.size(); + ASSERT_EQ(n, 2); + + val = mcache.acquire(1); + ASSERT_TRUE(val == nullptr); + val = mcache.acquire(2); + ASSERT_TRUE(val != nullptr); + val = mcache.acquire(3); + ASSERT_TRUE(val != nullptr); +} + +TEST(LRUResourceCache, evict_on_size) +{ + using MyCache = DB::LRUResourceCache; + auto mcache = MyCache(5, 2); + int x = 2; + auto load_int = [&] { return std::make_shared(x); }; + auto val = mcache.acquire(1, load_int); + mcache.release(1); + + val = mcache.acquire(2, load_int); + mcache.release(2); + + x = 3; + val = mcache.acquire(3, load_int); + ASSERT_TRUE(val != nullptr); + + auto n = mcache.size(); + ASSERT_EQ(n, 2); + auto w = mcache.weight(); + ASSERT_EQ(w, 2); + + val = mcache.acquire(1); + ASSERT_TRUE(val == nullptr); + val = mcache.acquire(2); + ASSERT_TRUE(val != nullptr); + val = mcache.acquire(3); + ASSERT_TRUE(val != nullptr); +} + +TEST(LRUResourceCache, not_evict_used_element) +{ + using MyCache = DB::LRUResourceCache; + auto mcache = MyCache(7, 10); + int x = 2; + auto load_int = [&] { return std::make_shared(x); }; + auto val = mcache.acquire(1, load_int); + + val = mcache.acquire(2, load_int); + mcache.release(2); + + val = mcache.acquire(3, load_int); + mcache.release(3); + + x = 3; + val = mcache.acquire(4, load_int); + ASSERT_TRUE(val != nullptr); + + auto n = mcache.size(); + ASSERT_EQ(n, 3); + auto w = mcache.weight(); + ASSERT_EQ(w, 7); + + val = mcache.acquire(1); + ASSERT_TRUE(val != nullptr); + val = mcache.acquire(2); + ASSERT_TRUE(val == nullptr); + val = mcache.acquire(3); + ASSERT_TRUE(val != nullptr); + val = mcache.acquire(4); + ASSERT_TRUE(val != nullptr); +} + +TEST(LRUResourceCache, acquire_fail) +{ + using MyCache = DB::LRUResourceCache; + auto mcache = MyCache(5, 10); + int x = 2; + auto load_int = [&] { return std::make_shared(x); }; + auto val = mcache.acquire(1, load_int); + val = mcache.acquire(2, load_int); + val = mcache.acquire(3, load_int); + ASSERT_TRUE(val == nullptr); + + auto n = mcache.size(); + ASSERT_EQ(n, 2); + auto w = mcache.weight(); + ASSERT_EQ(w, 4); + val = mcache.acquire(1); + ASSERT_TRUE(val != nullptr); + val = mcache.acquire(2); + ASSERT_TRUE(val != nullptr); + val = mcache.acquire(3); + ASSERT_TRUE(val == nullptr); +} + +TEST(LRUResourceCache, dup_acquire) +{ + using MyCache = DB::LRUResourceCache; + auto mcache = MyCache(20, 10); + int x = 2; + auto load_int = [&] { return std::make_shared(x); }; + auto val = mcache.acquire(1, load_int); + mcache.release(1); + x = 11; + val = mcache.acquire(1, load_int); + ASSERT_TRUE(val != nullptr); + + auto n = mcache.size(); + ASSERT_EQ(n, 1); + auto w = mcache.weight(); + ASSERT_EQ(w, 2); + val = mcache.acquire(1); + ASSERT_TRUE(val != nullptr); + ASSERT_TRUE(*val == 2); +} + +TEST(LRUResourceCache, re_acquire) +{ + using MyCache = DB::LRUResourceCache; + auto mcache = MyCache(20, 10); + int x = 2; + auto load_int = [&] { return std::make_shared(x); }; + auto val = mcache.acquire(1, load_int); + mcache.release(1); + mcache.tryRemove(1); + x = 11; + val = mcache.acquire(1, load_int); + ASSERT_TRUE(val != nullptr); + + auto n = mcache.size(); + ASSERT_EQ(n, 1); + auto w = mcache.weight(); + ASSERT_EQ(w, 11); + val = mcache.acquire(1); + ASSERT_TRUE(val != nullptr); + ASSERT_TRUE(*val == 11); +} diff --git a/src/IO/MMappedFileCache.h b/src/IO/MMappedFileCache.h index 7ee6957c7db..adbb85a18cf 100644 --- a/src/IO/MMappedFileCache.h +++ b/src/IO/MMappedFileCache.h @@ -48,12 +48,12 @@ public: MappedPtr getOrSet(const Key & key, LoadFunc && load) { auto result = Base::getOrSet(key, load); - if (result.cache_miss) + if (result.second) ProfileEvents::increment(ProfileEvents::MMappedFileCacheMisses); else ProfileEvents::increment(ProfileEvents::MMappedFileCacheHits); - return result.value; + return result.first; } }; diff --git a/src/IO/UncompressedCache.h b/src/IO/UncompressedCache.h index 78f81c15a4a..5826b7f020a 100644 --- a/src/IO/UncompressedCache.h +++ b/src/IO/UncompressedCache.h @@ -63,12 +63,12 @@ public: { auto result = Base::getOrSet(key, std::forward(load)); - if (result.cache_miss) + if (result.second) ProfileEvents::increment(ProfileEvents::UncompressedCacheMisses); else ProfileEvents::increment(ProfileEvents::UncompressedCacheHits); - return result.value; + return result.first; } private: diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 95341efa76a..5c9d94d7c45 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -354,14 +354,13 @@ void Aggregator::compileAggregateFunctionsIfNeeded() if (auto * compilation_cache = CompiledExpressionCacheFactory::instance().tryGetCache()) { - auto result = compilation_cache->getOrSet(aggregate_functions_description_hash_key, [&] () + auto [compiled_function_cache_entry, _] = compilation_cache->getOrSet(aggregate_functions_description_hash_key, [&] () { LOG_TRACE(log, "Compile expression {}", functions_description); auto compiled_aggregate_functions = compileAggregateFunctions(getJITInstance(), functions_to_compile, functions_description); return std::make_shared(std::move(compiled_aggregate_functions)); }); - auto compiled_function_cache_entry = result.value; compiled_aggregate_functions_holder = std::static_pointer_cast(compiled_function_cache_entry); } else diff --git a/src/Interpreters/ExpressionJIT.cpp b/src/Interpreters/ExpressionJIT.cpp index d5017b18dc1..90292d17fae 100644 --- a/src/Interpreters/ExpressionJIT.cpp +++ b/src/Interpreters/ExpressionJIT.cpp @@ -296,13 +296,12 @@ static FunctionBasePtr compile( if (auto * compilation_cache = CompiledExpressionCacheFactory::instance().tryGetCache()) { - auto result = compilation_cache->getOrSet(hash_key, [&] () + auto [compiled_function_cache_entry, _] = compilation_cache->getOrSet(hash_key, [&] () { LOG_TRACE(getLogger(), "Compile expression {}", llvm_function->getName()); auto compiled_function = compileFunction(getJITInstance(), *llvm_function); return std::make_shared(compiled_function); }); - auto compiled_function_cache_entry = result.value; std::shared_ptr compiled_function_holder = std::static_pointer_cast(compiled_function_cache_entry); llvm_function->setCompiledFunction(std::move(compiled_function_holder)); diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index d5ea1682dff..7f22386f54b 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -1033,7 +1033,7 @@ std::shared_ptr MergeJoin::loadRightBlock(size_t pos) const return std::make_shared(input.block_in->read()); }; - return cached_right_blocks->getOrSet(pos, load_func).value; + return cached_right_blocks->getOrSet(pos, load_func).first; } else return loaded_right_blocks[pos]; diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index c415f27d202..11e56ecbe0c 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -650,7 +650,7 @@ public: avro::ValidSchema getSchema(uint32_t id) { - auto [schema, loaded, _] = schema_cache.getOrSet( + auto [schema, loaded] = schema_cache.getOrSet( id, [this, id](){ return std::make_shared(fetchSchema(id)); } ); @@ -727,7 +727,7 @@ static LRUCache schema_registry_cache(SCH static std::shared_ptr getConfluentSchemaRegistry(const FormatSettings & format_settings) { const auto & base_url = format_settings.avro.schema_registry_url; - auto [schema_registry, loaded, _] = schema_registry_cache.getOrSet( + auto [schema_registry, loaded] = schema_registry_cache.getOrSet( base_url, [base_url]() { diff --git a/src/Storages/MarkCache.h b/src/Storages/MarkCache.h index 3438b4a1b9b..06143e954f8 100644 --- a/src/Storages/MarkCache.h +++ b/src/Storages/MarkCache.h @@ -59,12 +59,12 @@ public: MappedPtr getOrSet(const Key & key, LoadFunc && load) { auto result = Base::getOrSet(key, load); - if (result.cache_miss) + if (result.second) ProfileEvents::increment(ProfileEvents::MarkCacheMisses); else ProfileEvents::increment(ProfileEvents::MarkCacheHits); - return result.value; + return result.first; } }; From 5a419a356b4e5f6475c84306ec828a0132fee9d5 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 29 Dec 2021 15:29:59 +0800 Subject: [PATCH 15/28] fix a bug --- src/Common/LRUResourceCache.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Common/LRUResourceCache.h b/src/Common/LRUResourceCache.h index 7250b44a055..2b1039ed9a5 100644 --- a/src/Common/LRUResourceCache.h +++ b/src/Common/LRUResourceCache.h @@ -51,6 +51,7 @@ public: { hits++; it->second.reference_count += 1; + queue.splice(queue.end(), queue, it->second.queue_iterator); return it->second.value; } misses++; From 5f85f7726650e6a4a621af20d17f5d0e64f6b988 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 29 Dec 2021 15:34:20 +0800 Subject: [PATCH 16/28] add a new test case --- src/Common/tests/gtest_lru_resource_cache.cpp | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/Common/tests/gtest_lru_resource_cache.cpp b/src/Common/tests/gtest_lru_resource_cache.cpp index 79317d83936..495b08137c4 100644 --- a/src/Common/tests/gtest_lru_resource_cache.cpp +++ b/src/Common/tests/gtest_lru_resource_cache.cpp @@ -83,6 +83,38 @@ TEST(LRUResourceCache, evict_on_weight) ASSERT_TRUE(val != nullptr); } +TEST(LRUResourceCache, evict_on_weight_v2) +{ + using MyCache = DB::LRUResourceCache; + auto mcache = MyCache(5, 10); + int x = 2; + auto load_int = [&] { return std::make_shared(x); }; + auto val = mcache.acquire(1, load_int); + mcache.release(1); + + val = mcache.acquire(2, load_int); + mcache.release(2); + + val = mcache.acquire(1); + mcache.release(1); + + x = 3; + val = mcache.acquire(3, load_int); + ASSERT_TRUE(val != nullptr); + + auto w = mcache.weight(); + ASSERT_EQ(w, 5); + auto n = mcache.size(); + ASSERT_EQ(n, 2); + + val = mcache.acquire(1); + ASSERT_TRUE(val != nullptr); + val = mcache.acquire(2); + ASSERT_TRUE(val == nullptr); + val = mcache.acquire(3); + ASSERT_TRUE(val != nullptr); +} + TEST(LRUResourceCache, evict_on_size) { using MyCache = DB::LRUResourceCache; From 0d474069b5ccfb3f1075b6fd5b97f0d106245c0d Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 29 Dec 2021 15:38:58 +0800 Subject: [PATCH 17/28] add a new test case --- src/Common/tests/gtest_lru_resource_cache.cpp | 32 +++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/Common/tests/gtest_lru_resource_cache.cpp b/src/Common/tests/gtest_lru_resource_cache.cpp index 495b08137c4..f87855bf51d 100644 --- a/src/Common/tests/gtest_lru_resource_cache.cpp +++ b/src/Common/tests/gtest_lru_resource_cache.cpp @@ -115,6 +115,38 @@ TEST(LRUResourceCache, evict_on_weight_v2) ASSERT_TRUE(val != nullptr); } +TEST(LRUResourceCache, evict_on_weight_v3) +{ + using MyCache = DB::LRUResourceCache; + auto mcache = MyCache(5, 10); + int x = 2; + auto load_int = [&] { return std::make_shared(x); }; + auto val = mcache.acquire(1, load_int); + mcache.release(1); + + val = mcache.acquire(2, load_int); + mcache.release(2); + + val = mcache.acquire(1, load_int); + mcache.release(1); + + x = 3; + val = mcache.acquire(3, load_int); + ASSERT_TRUE(val != nullptr); + + auto w = mcache.weight(); + ASSERT_EQ(w, 5); + auto n = mcache.size(); + ASSERT_EQ(n, 2); + + val = mcache.acquire(1); + ASSERT_TRUE(val != nullptr); + val = mcache.acquire(2); + ASSERT_TRUE(val == nullptr); + val = mcache.acquire(3); + ASSERT_TRUE(val != nullptr); +} + TEST(LRUResourceCache, evict_on_size) { using MyCache = DB::LRUResourceCache; From 6aaeb285b0dbd6dba21d5ddf36fec85eb3ef2b22 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 29 Dec 2021 15:44:24 +0800 Subject: [PATCH 18/28] update destructor --- src/Common/LRUResourceCache.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/LRUResourceCache.h b/src/Common/LRUResourceCache.h index 2b1039ed9a5..221ba225f2c 100644 --- a/src/Common/LRUResourceCache.h +++ b/src/Common/LRUResourceCache.h @@ -33,7 +33,7 @@ class LRUResourceCache { public: LRUResourceCache(size_t max_weight_, size_t max_element_size_ = 0) : max_weight(max_weight_), max_element_size(max_element_size_) { } - virtual ~LRUResourceCache() { } + ~LRUResourceCache() = default; using Key = TKey; using Mapped = TMapped; using MappedPtr = std::shared_ptr; From 5a3b215f2448bdeace27a8a8b78fb5285d3ce73d Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 29 Dec 2021 16:00:57 +0800 Subject: [PATCH 19/28] fixed code style --- src/Common/tests/gtest_lru_cache.cpp | 27 +++++++++++---------------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/src/Common/tests/gtest_lru_cache.cpp b/src/Common/tests/gtest_lru_cache.cpp index 42e404de379..8a8b2ff3de8 100644 --- a/src/Common/tests/gtest_lru_cache.cpp +++ b/src/Common/tests/gtest_lru_cache.cpp @@ -5,11 +5,11 @@ TEST(LRUCache, set) { - using SimpleLRUCache =DB::LRUCache; + using SimpleLRUCache = DB::LRUCache; auto lru_cache = SimpleLRUCache(10, 10); lru_cache.set(1, std::make_shared(2)); - lru_cache.set(2,std::make_shared(3)); - + lru_cache.set(2, std::make_shared(3)); + auto w = lru_cache.weight(); auto n = lru_cache.count(); ASSERT_EQ(w, 2); @@ -18,10 +18,10 @@ TEST(LRUCache, set) TEST(LRUCache, update) { - using SimpleLRUCache =DB::LRUCache; + using SimpleLRUCache = DB::LRUCache; auto lru_cache = SimpleLRUCache(10, 10); lru_cache.set(1, std::make_shared(2)); - lru_cache.set(1,std::make_shared(3)); + lru_cache.set(1, std::make_shared(3)); auto val = lru_cache.get(1); ASSERT_TRUE(val != nullptr); ASSERT_TRUE(*val == 3); @@ -29,7 +29,7 @@ TEST(LRUCache, update) TEST(LRUCache, get) { - using SimpleLRUCache =DB::LRUCache; + using SimpleLRUCache = DB::LRUCache; auto lru_cache = SimpleLRUCache(10, 10); lru_cache.set(1, std::make_shared(2)); lru_cache.set(2, std::make_shared(3)); @@ -44,16 +44,12 @@ TEST(LRUCache, get) struct ValueWeight { - size_t operator()(const size_t & x) const - { - return x; - } + size_t operator()(const size_t & x) const { return x; } }; TEST(LRUCache, evict_on_size) { - - using SimpleLRUCache =DB::LRUCache; + using SimpleLRUCache = DB::LRUCache; auto lru_cache = SimpleLRUCache(20, 3); lru_cache.set(1, std::make_shared(2)); lru_cache.set(2, std::make_shared(3)); @@ -69,8 +65,7 @@ TEST(LRUCache, evict_on_size) TEST(LRUCache, evict_on_weight) { - - using SimpleLRUCache =DB::LRUCache, ValueWeight>; + using SimpleLRUCache = DB::LRUCache, ValueWeight>; auto lru_cache = SimpleLRUCache(10, 10); lru_cache.set(1, std::make_shared(2)); lru_cache.set(2, std::make_shared(3)); @@ -91,10 +86,10 @@ TEST(LRUCache, evict_on_weight) TEST(LRUCache, getOrSet) { - using SimpleLRUCache =DB::LRUCache, ValueWeight>; + using SimpleLRUCache = DB::LRUCache, ValueWeight>; auto lru_cache = SimpleLRUCache(10, 10); size_t x = 10; - auto load_func = [&]{ return std::make_shared(x); }; + auto load_func = [&] { return std::make_shared(x); }; auto [value, loaded] = lru_cache.getOrSet(1, load_func); ASSERT_TRUE(value != nullptr); ASSERT_TRUE(*value == 10); From 6b6a82f3b9f707fc856393b12fe8fa521ac0e24f Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 29 Dec 2021 18:18:38 +0800 Subject: [PATCH 20/28] add MappedHolder to get cache values --- src/Common/LRUResourceCache.h | 210 ++++++++++------ src/Common/tests/gtest_lru_resource_cache.cpp | 233 +++++++++--------- 2 files changed, 242 insertions(+), 201 deletions(-) diff --git a/src/Common/LRUResourceCache.h b/src/Common/LRUResourceCache.h index 221ba225f2c..d33280add25 100644 --- a/src/Common/LRUResourceCache.h +++ b/src/Common/LRUResourceCache.h @@ -32,102 +32,59 @@ template < class LRUResourceCache { public: - LRUResourceCache(size_t max_weight_, size_t max_element_size_ = 0) : max_weight(max_weight_), max_element_size(max_element_size_) { } - ~LRUResourceCache() = default; using Key = TKey; using Mapped = TMapped; using MappedPtr = std::shared_ptr; - - // - load_func : when key is not exists in cache, load_func is called to generate a new key - // - return: is null when there is no more space for the new value or the old value is in used. - template - MappedPtr acquire(const Key & key, LoadFunc && load_func) + class MappedHolder { - InsertToken * insert_token = nullptr; + public: + ~MappedHolder() { - std::lock_guard lock(mutex); - auto it = cells.find(key); - if (it != cells.end()) - { - hits++; - it->second.reference_count += 1; - queue.splice(queue.end(), queue, it->second.queue_iterator); - return it->second.value; - } - misses++; - insert_token = acquireInsertToken(key); + cache->release(key); + } - Cell * cell_ptr = nullptr; + Mapped & value() { - std::lock_guard lock(insert_token->mutex); - if (!insert_token->value) - { - insert_token->value = load_func(); - std::lock_guard cell_lock(mutex); - cell_ptr = insert_value(key, insert_token->value); - if (cell_ptr) - { - cell_ptr->reference_count += 1; - } - else - { - insert_token->value = nullptr; - } - } + return *(val.get()); + } + static bool tryRemove(std::unique_ptr * holder_ptr) + { + auto & holder = *holder_ptr; + auto cache = holder->cache; + auto key = holder->key; + *holder_ptr = nullptr; + return cache->tryRemove(key); } - std::lock_guard lock(mutex); - releaseInsertToken(key); - if (cell_ptr) + MappedHolder(LRUResourceCache * cache_, const Key & key_, MappedPtr value_) : cache(cache_), key(key_), val(value_) { - return cell_ptr->value; } - return nullptr; - } + protected: + LRUResourceCache * cache; + Key key; + MappedPtr val; + }; + using MappedHolderPtr = std::unique_ptr; - MappedPtr acquire(const Key & key) + // use get() or getOrSet() to access the elements + MappedHolderPtr get(const Key & key) { - std::lock_guard lock(mutex); - auto it = cells.find(key); - if (it == cells.end()) - { - misses++; + auto mappedptr = getImpl(key); + if (!mappedptr) return nullptr; - } - hits++; - it->second.reference_count += 1; - queue.splice(queue.end(), queue, it->second.queue_iterator); - return it->second.value; + return std::make_unique(this, key, mappedptr); + } + template + MappedHolderPtr getOrSet(const Key & key, LoadFunc && load_func) + { + auto mappedptr = getImpl(key, load_func); + if (!mappedptr) + return nullptr; + return std::make_unique(this, key, mappedptr); } - // mark a reference is released - void release(const Key & key) - { - std::lock_guard lock(mutex); - auto it = cells.find(key); - if (it == cells.end() || it->second.reference_count == 0) - { - LOG_ERROR(&Poco::Logger::get("LRUResourceCache"), "try to release an invalid element"); - abort(); - } - it->second.reference_count -= 1; - } - - // If you want to update a value, call tryRemove() at first and then call acquire() with load_func. - bool tryRemove(const Key & key) - { - std::lock_guard guard(mutex); - auto it = cells.find(key); - if (it == cells.end()) - return true; - auto & cell = it->second; - if (cell.reference_count) - return false; - queue.erase(cell.queue_iterator); - current_weight -= cell.weight; - cells.erase(it); - return true; - } + LRUResourceCache(size_t max_weight_, size_t max_element_size_ = 0) : max_weight(max_weight_), max_element_size(max_element_size_) { } + ~LRUResourceCache() = default; size_t weight() { @@ -181,6 +138,81 @@ private: std::atomic hits{0}; std::atomic misses{0}; std::atomic evict_count{0}; + + // - load_func : when key is not exists in cache, load_func is called to generate a new key + // - return: is null when there is no more space for the new value or the old value is in used. + template + MappedPtr getImpl(const Key & key, LoadFunc && load_func) + { + InsertToken * insert_token = nullptr; + { + std::lock_guard lock(mutex); + auto it = cells.find(key); + if (it != cells.end()) + { + hits++; + it->second.reference_count += 1; + queue.splice(queue.end(), queue, it->second.queue_iterator); + return it->second.value; + } + misses++; + insert_token = acquireInsertToken(key); + } + Cell * cell_ptr = nullptr; + { + std::lock_guard lock(insert_token->mutex); + if (!insert_token->value) + { + insert_token->value = load_func(); + std::lock_guard cell_lock(mutex); + cell_ptr = set(key, insert_token->value); + if (cell_ptr) + { + cell_ptr->reference_count += 1; + } + else + { + insert_token->value = nullptr; + } + } + } + + std::lock_guard lock(mutex); + releaseInsertToken(key); + if (cell_ptr) + { + return cell_ptr->value; + } + return nullptr; + } + + MappedPtr getImpl(const Key & key) + { + std::lock_guard lock(mutex); + auto it = cells.find(key); + if (it == cells.end()) + { + misses++; + return nullptr; + } + hits++; + it->second.reference_count += 1; + queue.splice(queue.end(), queue, it->second.queue_iterator); + return it->second.value; + } + + // mark a reference is released + void release(const Key & key) + { + std::lock_guard lock(mutex); + auto it = cells.find(key); + if (it == cells.end() || it->second.reference_count == 0) + { + LOG_ERROR(&Poco::Logger::get("LRUResourceCache"), "try to release an invalid element"); + abort(); + } + it->second.reference_count -= 1; + } InsertToken * acquireInsertToken(const Key & key) { @@ -201,7 +233,7 @@ private: } // key mustn't be in the cache - Cell * insert_value(const Key & insert_key, MappedPtr value) + Cell * set(const Key & insert_key, MappedPtr value) { auto weight = value ? weight_function(*value) : 0; auto queue_size = cells.size() + 1; @@ -251,5 +283,21 @@ private: new_cell.queue_iterator = queue.insert(queue.end(), insert_key); return &new_cell; } + + // If you want to update a value, call tryRemove() at first and then call acquire() with load_func. + bool tryRemove(const Key & key) + { + std::lock_guard guard(mutex); + auto it = cells.find(key); + if (it == cells.end()) + return true; + auto & cell = it->second; + if (cell.reference_count) + return false; + queue.erase(cell.queue_iterator); + current_weight -= cell.weight; + cells.erase(it); + return true; + } }; } diff --git a/src/Common/tests/gtest_lru_resource_cache.cpp b/src/Common/tests/gtest_lru_resource_cache.cpp index f87855bf51d..0d58fd571a2 100644 --- a/src/Common/tests/gtest_lru_resource_cache.cpp +++ b/src/Common/tests/gtest_lru_resource_cache.cpp @@ -3,21 +3,21 @@ #include #include -TEST(LRUResourceCache, acquire) +TEST(LRUResourceCache, get) { using MyCache = DB::LRUResourceCache; auto mcache = MyCache(10, 10); int x = 10; auto load_int = [&] { return std::make_shared(x); }; - auto val = mcache.acquire(1, load_int); + auto holder1 = mcache.getOrSet(1, load_int); x = 11; - val = mcache.acquire(2, load_int); - ASSERT_TRUE(val != nullptr); - ASSERT_TRUE(*val == 11); + auto holder2 = mcache.getOrSet(2, load_int); + ASSERT_TRUE(holder2 != nullptr); + ASSERT_TRUE(holder2->value() == 11); - val = mcache.acquire(1); - ASSERT_TRUE(val != nullptr); - ASSERT_TRUE(*val == 10); + auto holder3 = mcache.get(1); + ASSERT_TRUE(holder3 != nullptr); + ASSERT_TRUE(holder3->value() == 10); } TEST(LRUResourceCache, remove) @@ -26,27 +26,20 @@ TEST(LRUResourceCache, remove) auto mcache = MyCache(10, 10); int x = 10; auto load_int = [&] { return std::make_shared(x); }; - auto val = mcache.acquire(1, load_int); - x = 11; - val = mcache.acquire(2, load_int); + auto holder0 = mcache.getOrSet(1, load_int); + auto holder1 = mcache.getOrSet(1, load_int); - auto succ = mcache.tryRemove(3); - ASSERT_TRUE(succ); - - succ = mcache.tryRemove(1); + auto succ = MyCache::MappedHolder::tryRemove(&holder0); ASSERT_TRUE(!succ); - val = mcache.acquire(1); - ASSERT_TRUE(val != nullptr); - ASSERT_TRUE(*val == 10); + holder0 = mcache.get(1); + ASSERT_TRUE(holder0 != nullptr); + ASSERT_TRUE(holder0->value() == 10); - mcache.release(1); - succ = mcache.tryRemove(1); - ASSERT_TRUE(!succ); - mcache.release(1); - succ = mcache.tryRemove(1); + holder0 = nullptr; + succ = MyCache::MappedHolder::tryRemove(&holder1); ASSERT_TRUE(succ); - val = mcache.acquire(1); - ASSERT_TRUE(val == nullptr); + holder1 = mcache.get(1); + ASSERT_TRUE(holder1 == nullptr); } struct MyWeight @@ -60,27 +53,27 @@ TEST(LRUResourceCache, evict_on_weight) auto mcache = MyCache(5, 10); int x = 2; auto load_int = [&] { return std::make_shared(x); }; - auto val = mcache.acquire(1, load_int); - mcache.release(1); + auto holder1 = mcache.getOrSet(1, load_int); + holder1 = nullptr; - val = mcache.acquire(2, load_int); - mcache.release(2); + auto holder2 = mcache.getOrSet(2, load_int); + holder2 = nullptr; x = 3; - val = mcache.acquire(3, load_int); - ASSERT_TRUE(val != nullptr); + auto holder3 = mcache.getOrSet(3, load_int); + ASSERT_TRUE(holder3 != nullptr); auto w = mcache.weight(); ASSERT_EQ(w, 5); auto n = mcache.size(); ASSERT_EQ(n, 2); - val = mcache.acquire(1); - ASSERT_TRUE(val == nullptr); - val = mcache.acquire(2); - ASSERT_TRUE(val != nullptr); - val = mcache.acquire(3); - ASSERT_TRUE(val != nullptr); + holder1 = mcache.get(1); + ASSERT_TRUE(holder1 == nullptr); + holder2 = mcache.get(2); + ASSERT_TRUE(holder2 != nullptr); + holder3 = mcache.get(3); + ASSERT_TRUE(holder3 != nullptr); } TEST(LRUResourceCache, evict_on_weight_v2) @@ -89,30 +82,30 @@ TEST(LRUResourceCache, evict_on_weight_v2) auto mcache = MyCache(5, 10); int x = 2; auto load_int = [&] { return std::make_shared(x); }; - auto val = mcache.acquire(1, load_int); - mcache.release(1); + auto holder1 = mcache.getOrSet(1, load_int); + holder1 = nullptr; - val = mcache.acquire(2, load_int); - mcache.release(2); + auto holder2 = mcache.getOrSet(2, load_int); + holder2 = nullptr; - val = mcache.acquire(1); - mcache.release(1); + holder1 = mcache.get(1); + holder1 = nullptr; x = 3; - val = mcache.acquire(3, load_int); - ASSERT_TRUE(val != nullptr); + auto holder3 = mcache.getOrSet(3, load_int); + ASSERT_TRUE(holder3 != nullptr); auto w = mcache.weight(); ASSERT_EQ(w, 5); auto n = mcache.size(); ASSERT_EQ(n, 2); - val = mcache.acquire(1); - ASSERT_TRUE(val != nullptr); - val = mcache.acquire(2); - ASSERT_TRUE(val == nullptr); - val = mcache.acquire(3); - ASSERT_TRUE(val != nullptr); + holder1 = mcache.get(1); + ASSERT_TRUE(holder1 != nullptr); + holder2 = mcache.get(2); + ASSERT_TRUE(holder2 == nullptr); + holder3 = mcache.get(3); + ASSERT_TRUE(holder3 != nullptr); } TEST(LRUResourceCache, evict_on_weight_v3) @@ -121,30 +114,30 @@ TEST(LRUResourceCache, evict_on_weight_v3) auto mcache = MyCache(5, 10); int x = 2; auto load_int = [&] { return std::make_shared(x); }; - auto val = mcache.acquire(1, load_int); - mcache.release(1); + auto holder1 = mcache.getOrSet(1, load_int); + holder1 = nullptr; - val = mcache.acquire(2, load_int); - mcache.release(2); + auto holder2 = mcache.getOrSet(2, load_int); + holder2 = nullptr; - val = mcache.acquire(1, load_int); - mcache.release(1); + holder1 = mcache.getOrSet(1, load_int); + holder1 = nullptr; x = 3; - val = mcache.acquire(3, load_int); - ASSERT_TRUE(val != nullptr); + auto holder3 = mcache.getOrSet(3, load_int); + ASSERT_TRUE(holder3 != nullptr); auto w = mcache.weight(); ASSERT_EQ(w, 5); auto n = mcache.size(); ASSERT_EQ(n, 2); - val = mcache.acquire(1); - ASSERT_TRUE(val != nullptr); - val = mcache.acquire(2); - ASSERT_TRUE(val == nullptr); - val = mcache.acquire(3); - ASSERT_TRUE(val != nullptr); + holder1 = mcache.get(1); + ASSERT_TRUE(holder1 != nullptr); + holder2 = mcache.get(2); + ASSERT_TRUE(holder2 == nullptr); + holder3 = mcache.get(3); + ASSERT_TRUE(holder3 != nullptr); } TEST(LRUResourceCache, evict_on_size) @@ -153,27 +146,27 @@ TEST(LRUResourceCache, evict_on_size) auto mcache = MyCache(5, 2); int x = 2; auto load_int = [&] { return std::make_shared(x); }; - auto val = mcache.acquire(1, load_int); - mcache.release(1); + auto holder1 = mcache.getOrSet(1, load_int); + holder1 = nullptr; - val = mcache.acquire(2, load_int); - mcache.release(2); + auto holder2 = mcache.getOrSet(2, load_int); + holder2 = nullptr; x = 3; - val = mcache.acquire(3, load_int); - ASSERT_TRUE(val != nullptr); + auto holder3 = mcache.getOrSet(3, load_int); + ASSERT_TRUE(holder3 != nullptr); auto n = mcache.size(); ASSERT_EQ(n, 2); auto w = mcache.weight(); ASSERT_EQ(w, 2); - val = mcache.acquire(1); - ASSERT_TRUE(val == nullptr); - val = mcache.acquire(2); - ASSERT_TRUE(val != nullptr); - val = mcache.acquire(3); - ASSERT_TRUE(val != nullptr); + holder1 = mcache.get(1); + ASSERT_TRUE(holder1 == nullptr); + holder2 = mcache.get(2); + ASSERT_TRUE(holder2 != nullptr); + holder3 = mcache.get(3); + ASSERT_TRUE(holder3 != nullptr); } TEST(LRUResourceCache, not_evict_used_element) @@ -182,95 +175,95 @@ TEST(LRUResourceCache, not_evict_used_element) auto mcache = MyCache(7, 10); int x = 2; auto load_int = [&] { return std::make_shared(x); }; - auto val = mcache.acquire(1, load_int); + auto holder1 = mcache.getOrSet(1, load_int); - val = mcache.acquire(2, load_int); - mcache.release(2); + auto holder2 = mcache.getOrSet(2, load_int); + holder2 = nullptr; - val = mcache.acquire(3, load_int); - mcache.release(3); + auto holder3 = mcache.getOrSet(3, load_int); + holder3 = nullptr; x = 3; - val = mcache.acquire(4, load_int); - ASSERT_TRUE(val != nullptr); + auto holder4 = mcache.getOrSet(4, load_int); + ASSERT_TRUE(holder4 != nullptr); auto n = mcache.size(); ASSERT_EQ(n, 3); auto w = mcache.weight(); ASSERT_EQ(w, 7); - val = mcache.acquire(1); - ASSERT_TRUE(val != nullptr); - val = mcache.acquire(2); - ASSERT_TRUE(val == nullptr); - val = mcache.acquire(3); - ASSERT_TRUE(val != nullptr); - val = mcache.acquire(4); - ASSERT_TRUE(val != nullptr); + holder1 = mcache.get(1); + ASSERT_TRUE(holder1 != nullptr); + holder2 = mcache.get(2); + ASSERT_TRUE(holder2 == nullptr); + holder3 = mcache.get(3); + ASSERT_TRUE(holder3 != nullptr); + holder4 = mcache.get(4); + ASSERT_TRUE(holder4 != nullptr); } -TEST(LRUResourceCache, acquire_fail) +TEST(LRUResourceCache, get_fail) { using MyCache = DB::LRUResourceCache; auto mcache = MyCache(5, 10); int x = 2; auto load_int = [&] { return std::make_shared(x); }; - auto val = mcache.acquire(1, load_int); - val = mcache.acquire(2, load_int); - val = mcache.acquire(3, load_int); - ASSERT_TRUE(val == nullptr); + auto holder1 = mcache.getOrSet(1, load_int); + auto holder2 = mcache.getOrSet(2, load_int); + auto holder3 = mcache.getOrSet(3, load_int); + ASSERT_TRUE(holder3 == nullptr); auto n = mcache.size(); ASSERT_EQ(n, 2); auto w = mcache.weight(); ASSERT_EQ(w, 4); - val = mcache.acquire(1); - ASSERT_TRUE(val != nullptr); - val = mcache.acquire(2); - ASSERT_TRUE(val != nullptr); - val = mcache.acquire(3); - ASSERT_TRUE(val == nullptr); + holder1 = mcache.get(1); + ASSERT_TRUE(holder1 != nullptr); + holder2 = mcache.get(2); + ASSERT_TRUE(holder2 != nullptr); + holder3 = mcache.get(3); + ASSERT_TRUE(holder3 == nullptr); } -TEST(LRUResourceCache, dup_acquire) +TEST(LRUResourceCache, dup_get) { using MyCache = DB::LRUResourceCache; auto mcache = MyCache(20, 10); int x = 2; auto load_int = [&] { return std::make_shared(x); }; - auto val = mcache.acquire(1, load_int); - mcache.release(1); + auto holder1 = mcache.getOrSet(1, load_int); + holder1 = nullptr; x = 11; - val = mcache.acquire(1, load_int); - ASSERT_TRUE(val != nullptr); + holder1 = mcache.getOrSet(1, load_int); + ASSERT_TRUE(holder1 != nullptr); auto n = mcache.size(); ASSERT_EQ(n, 1); auto w = mcache.weight(); ASSERT_EQ(w, 2); - val = mcache.acquire(1); - ASSERT_TRUE(val != nullptr); - ASSERT_TRUE(*val == 2); + holder1 = mcache.get(1); + ASSERT_TRUE(holder1 != nullptr); + ASSERT_TRUE(holder1->value() == 2); } -TEST(LRUResourceCache, re_acquire) +TEST(LRUResourceCache, re_get) { using MyCache = DB::LRUResourceCache; auto mcache = MyCache(20, 10); int x = 2; auto load_int = [&] { return std::make_shared(x); }; - auto val = mcache.acquire(1, load_int); - mcache.release(1); - mcache.tryRemove(1); + auto holder1 = mcache.getOrSet(1, load_int); + MyCache::MappedHolder::tryRemove(&holder1); + x = 11; - val = mcache.acquire(1, load_int); - ASSERT_TRUE(val != nullptr); + holder1 = mcache.getOrSet(1, load_int); + ASSERT_TRUE(holder1 != nullptr); auto n = mcache.size(); ASSERT_EQ(n, 1); auto w = mcache.weight(); ASSERT_EQ(w, 11); - val = mcache.acquire(1); - ASSERT_TRUE(val != nullptr); - ASSERT_TRUE(*val == 11); + holder1 = mcache.get(1); + ASSERT_TRUE(holder1 != nullptr); + ASSERT_TRUE(holder1->value() == 11); } From a41273826448b6b278d5d6d2214de78005b42be5 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Wed, 29 Dec 2021 18:27:52 +0800 Subject: [PATCH 21/28] format code style --- src/Common/LRUResourceCache.h | 22 +++++++--------------- 1 file changed, 7 insertions(+), 15 deletions(-) diff --git a/src/Common/LRUResourceCache.h b/src/Common/LRUResourceCache.h index d33280add25..3dd078bec5a 100644 --- a/src/Common/LRUResourceCache.h +++ b/src/Common/LRUResourceCache.h @@ -38,15 +38,8 @@ public: class MappedHolder { public: - ~MappedHolder() - { - cache->release(key); - - } - Mapped & value() - { - return *(val.get()); - } + ~MappedHolder() { cache->release(key); } + Mapped & value() { return *(val.get()); } static bool tryRemove(std::unique_ptr * holder_ptr) { auto & holder = *holder_ptr; @@ -56,9 +49,8 @@ public: return cache->tryRemove(key); } - MappedHolder(LRUResourceCache * cache_, const Key & key_, MappedPtr value_) : cache(cache_), key(key_), val(value_) - { - } + MappedHolder(LRUResourceCache * cache_, const Key & key_, MappedPtr value_) : cache(cache_), key(key_), val(value_) { } + protected: LRUResourceCache * cache; Key key; @@ -74,7 +66,7 @@ public: return nullptr; return std::make_unique(this, key, mappedptr); } - template + template MappedHolderPtr getOrSet(const Key & key, LoadFunc && load_func) { auto mappedptr = getImpl(key, load_func); @@ -138,7 +130,7 @@ private: std::atomic hits{0}; std::atomic misses{0}; std::atomic evict_count{0}; - + // - load_func : when key is not exists in cache, load_func is called to generate a new key // - return: is null when there is no more space for the new value or the old value is in used. template @@ -283,7 +275,7 @@ private: new_cell.queue_iterator = queue.insert(queue.end(), insert_key); return &new_cell; } - + // If you want to update a value, call tryRemove() at first and then call acquire() with load_func. bool tryRemove(const Key & key) { From 88383b67152a63b21ee80a1332e1b1eb04d23609 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 30 Dec 2021 10:08:33 +0800 Subject: [PATCH 22/28] some change --- src/Common/LRUResourceCache.h | 84 +++++++++++-------- src/Common/tests/gtest_lru_resource_cache.cpp | 19 +++-- 2 files changed, 58 insertions(+), 45 deletions(-) diff --git a/src/Common/LRUResourceCache.h b/src/Common/LRUResourceCache.h index 3dd078bec5a..192beb5b803 100644 --- a/src/Common/LRUResourceCache.h +++ b/src/Common/LRUResourceCache.h @@ -39,16 +39,7 @@ public: { public: ~MappedHolder() { cache->release(key); } - Mapped & value() { return *(val.get()); } - static bool tryRemove(std::unique_ptr * holder_ptr) - { - auto & holder = *holder_ptr; - auto cache = holder->cache; - auto key = holder->key; - *holder_ptr = nullptr; - return cache->tryRemove(key); - } - + Mapped & value() { return *val.get(); } MappedHolder(LRUResourceCache * cache_, const Key & key_, MappedPtr value_) : cache(cache_), key(key_), val(value_) { } protected: @@ -61,10 +52,10 @@ public: // use get() or getOrSet() to access the elements MappedHolderPtr get(const Key & key) { - auto mappedptr = getImpl(key); - if (!mappedptr) + auto mapped_ptr = getImpl(key); + if (!mapped_ptr) return nullptr; - return std::make_unique(this, key, mappedptr); + return std::make_unique(this, key, mapped_ptr); } template MappedHolderPtr getOrSet(const Key & key, LoadFunc && load_func) @@ -75,6 +66,24 @@ public: return std::make_unique(this, key, mappedptr); } + // If the key's reference_count = 0, delete it immediately. otherwise, mark it expired, and delete in release + void tryRemove(const Key & key) + { + std::lock_guard lock(mutex); + auto it = cells.find(key); + if (it == cells.end()) + return; + auto & cell = it->second; + if (cell.reference_count == 0) + { + queue.erase(cell.queue_iterator); + current_weight -= cell.weight; + cells.erase(it); + } + else + cell.expired = true; + } + LRUResourceCache(size_t max_weight_, size_t max_element_size_ = 0) : max_weight(max_weight_), max_element_size(max_element_size_) { } ~LRUResourceCache() = default; @@ -109,6 +118,7 @@ private: size_t weight = 0; LRUQueueIterator queue_iterator; size_t reference_count = 0; + bool expired = false; }; using Cells = std::unordered_map; @@ -140,12 +150,23 @@ private: { std::lock_guard lock(mutex); auto it = cells.find(key); - if (it != cells.end()) + if (it != cells.end() && !it->second.expired) { - hits++; - it->second.reference_count += 1; - queue.splice(queue.end(), queue, it->second.queue_iterator); - return it->second.value; + if (!it->second.expired) + { + hits++; + it->second.reference_count += 1; + queue.splice(queue.end(), queue, it->second.queue_iterator); + return it->second.value; + } + else if (it->second.reference_count > 0) + return nullptr; + else + { + // should not reach here + LOG_ERROR(&Poco::Logger::get("LRUResourceCache"), "element is in invalid status."); + abort(); + } } misses++; insert_token = acquireInsertToken(key); @@ -182,7 +203,7 @@ private: { std::lock_guard lock(mutex); auto it = cells.find(key); - if (it == cells.end()) + if (it == cells.end() || it->second.expired) { misses++; return nullptr; @@ -203,7 +224,14 @@ private: LOG_ERROR(&Poco::Logger::get("LRUResourceCache"), "try to release an invalid element"); abort(); } - it->second.reference_count -= 1; + auto & cell = it->second; + cell.reference_count -= 1; + if (cell.expired && cell.reference_count == 0) + { + queue.erase(cell.queue_iterator); + current_weight -= cell.weight; + cells.erase(it); + } } InsertToken * acquireInsertToken(const Key & key) @@ -275,21 +303,5 @@ private: new_cell.queue_iterator = queue.insert(queue.end(), insert_key); return &new_cell; } - - // If you want to update a value, call tryRemove() at first and then call acquire() with load_func. - bool tryRemove(const Key & key) - { - std::lock_guard guard(mutex); - auto it = cells.find(key); - if (it == cells.end()) - return true; - auto & cell = it->second; - if (cell.reference_count) - return false; - queue.erase(cell.queue_iterator); - current_weight -= cell.weight; - cells.erase(it); - return true; - } }; } diff --git a/src/Common/tests/gtest_lru_resource_cache.cpp b/src/Common/tests/gtest_lru_resource_cache.cpp index 0d58fd571a2..0eb0ec95476 100644 --- a/src/Common/tests/gtest_lru_resource_cache.cpp +++ b/src/Common/tests/gtest_lru_resource_cache.cpp @@ -29,17 +29,16 @@ TEST(LRUResourceCache, remove) auto holder0 = mcache.getOrSet(1, load_int); auto holder1 = mcache.getOrSet(1, load_int); - auto succ = MyCache::MappedHolder::tryRemove(&holder0); - ASSERT_TRUE(!succ); + mcache.tryRemove(1); holder0 = mcache.get(1); - ASSERT_TRUE(holder0 != nullptr); - ASSERT_TRUE(holder0->value() == 10); + ASSERT_TRUE(holder0 == nullptr); + auto n = mcache.size(); + ASSERT_TRUE(n == 1); holder0 = nullptr; - succ = MyCache::MappedHolder::tryRemove(&holder1); - ASSERT_TRUE(succ); - holder1 = mcache.get(1); - ASSERT_TRUE(holder1 == nullptr); + holder1 = nullptr; + n = mcache.size(); + ASSERT_TRUE(n == 0); } struct MyWeight @@ -253,9 +252,10 @@ TEST(LRUResourceCache, re_get) int x = 2; auto load_int = [&] { return std::make_shared(x); }; auto holder1 = mcache.getOrSet(1, load_int); - MyCache::MappedHolder::tryRemove(&holder1); + mcache.tryRemove(1); x = 11; + holder1 = nullptr; holder1 = mcache.getOrSet(1, load_int); ASSERT_TRUE(holder1 != nullptr); @@ -267,3 +267,4 @@ TEST(LRUResourceCache, re_get) ASSERT_TRUE(holder1 != nullptr); ASSERT_TRUE(holder1->value() == 11); } + From 54d78bde0dfe484c43571611e92cd4aa034f7a33 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 30 Dec 2021 10:15:56 +0800 Subject: [PATCH 23/28] update comments --- src/Common/LRUResourceCache.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Common/LRUResourceCache.h b/src/Common/LRUResourceCache.h index 192beb5b803..8117e37b844 100644 --- a/src/Common/LRUResourceCache.h +++ b/src/Common/LRUResourceCache.h @@ -141,7 +141,7 @@ private: std::atomic misses{0}; std::atomic evict_count{0}; - // - load_func : when key is not exists in cache, load_func is called to generate a new key + // - load_func : when key is not exists in cache, load_func is called to generate a new value // - return: is null when there is no more space for the new value or the old value is in used. template MappedPtr getImpl(const Key & key, LoadFunc && load_func) From 38605d6be3a3ad19952624ffca4c94885cefe930 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 30 Dec 2021 12:20:14 +0800 Subject: [PATCH 24/28] rename test cases --- src/Common/tests/gtest_lru_resource_cache.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/Common/tests/gtest_lru_resource_cache.cpp b/src/Common/tests/gtest_lru_resource_cache.cpp index 0eb0ec95476..fcdd2902522 100644 --- a/src/Common/tests/gtest_lru_resource_cache.cpp +++ b/src/Common/tests/gtest_lru_resource_cache.cpp @@ -46,7 +46,7 @@ struct MyWeight size_t operator()(const int & x) const { return static_cast(x); } }; -TEST(LRUResourceCache, evict_on_weight) +TEST(LRUResourceCache, evictOnWweight) { using MyCache = DB::LRUResourceCache; auto mcache = MyCache(5, 10); @@ -75,7 +75,7 @@ TEST(LRUResourceCache, evict_on_weight) ASSERT_TRUE(holder3 != nullptr); } -TEST(LRUResourceCache, evict_on_weight_v2) +TEST(LRUResourceCache, evictOnWeightV2) { using MyCache = DB::LRUResourceCache; auto mcache = MyCache(5, 10); @@ -107,7 +107,7 @@ TEST(LRUResourceCache, evict_on_weight_v2) ASSERT_TRUE(holder3 != nullptr); } -TEST(LRUResourceCache, evict_on_weight_v3) +TEST(LRUResourceCache, evictOnWeightV3) { using MyCache = DB::LRUResourceCache; auto mcache = MyCache(5, 10); @@ -139,7 +139,7 @@ TEST(LRUResourceCache, evict_on_weight_v3) ASSERT_TRUE(holder3 != nullptr); } -TEST(LRUResourceCache, evict_on_size) +TEST(LRUResourceCache, evictOnSize) { using MyCache = DB::LRUResourceCache; auto mcache = MyCache(5, 2); @@ -168,7 +168,7 @@ TEST(LRUResourceCache, evict_on_size) ASSERT_TRUE(holder3 != nullptr); } -TEST(LRUResourceCache, not_evict_used_element) +TEST(LRUResourceCache, notEvictUsedElement) { using MyCache = DB::LRUResourceCache; auto mcache = MyCache(7, 10); @@ -201,7 +201,7 @@ TEST(LRUResourceCache, not_evict_used_element) ASSERT_TRUE(holder4 != nullptr); } -TEST(LRUResourceCache, get_fail) +TEST(LRUResourceCache, getFail) { using MyCache = DB::LRUResourceCache; auto mcache = MyCache(5, 10); @@ -224,7 +224,7 @@ TEST(LRUResourceCache, get_fail) ASSERT_TRUE(holder3 == nullptr); } -TEST(LRUResourceCache, dup_get) +TEST(LRUResourceCache, dupGet) { using MyCache = DB::LRUResourceCache; auto mcache = MyCache(20, 10); @@ -245,7 +245,7 @@ TEST(LRUResourceCache, dup_get) ASSERT_TRUE(holder1->value() == 2); } -TEST(LRUResourceCache, re_get) +TEST(LRUResourceCache, reGet) { using MyCache = DB::LRUResourceCache; auto mcache = MyCache(20, 10); From e8c9079bb89a32d7bed5d890c096c7147aa420ce Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 30 Dec 2021 14:34:08 +0800 Subject: [PATCH 25/28] rename test cases --- src/Common/tests/gtest_lru_cache.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Common/tests/gtest_lru_cache.cpp b/src/Common/tests/gtest_lru_cache.cpp index 8a8b2ff3de8..7694a76ea72 100644 --- a/src/Common/tests/gtest_lru_cache.cpp +++ b/src/Common/tests/gtest_lru_cache.cpp @@ -47,7 +47,7 @@ struct ValueWeight size_t operator()(const size_t & x) const { return x; } }; -TEST(LRUCache, evict_on_size) +TEST(LRUCache, evictOnSize) { using SimpleLRUCache = DB::LRUCache; auto lru_cache = SimpleLRUCache(20, 3); @@ -63,7 +63,7 @@ TEST(LRUCache, evict_on_size) ASSERT_TRUE(value == nullptr); } -TEST(LRUCache, evict_on_weight) +TEST(LRUCache, evictOnWeight) { using SimpleLRUCache = DB::LRUCache, ValueWeight>; auto lru_cache = SimpleLRUCache(10, 10); From 77084f53496d4a4eb8ff7b0a31ebe2935158cd29 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 30 Dec 2021 18:43:12 +0800 Subject: [PATCH 26/28] add token holder --- src/Common/LRUResourceCache.h | 114 +++++++++++++++++++++++++++------- 1 file changed, 90 insertions(+), 24 deletions(-) diff --git a/src/Common/LRUResourceCache.h b/src/Common/LRUResourceCache.h index 8117e37b844..76280497310 100644 --- a/src/Common/LRUResourceCache.h +++ b/src/Common/LRUResourceCache.h @@ -60,10 +60,10 @@ public: template MappedHolderPtr getOrSet(const Key & key, LoadFunc && load_func) { - auto mappedptr = getImpl(key, load_func); - if (!mappedptr) + auto mapped_ptr = getImpl(key, load_func); + if (!mapped_ptr) return nullptr; - return std::make_unique(this, key, mappedptr); + return std::make_unique(this, key, mapped_ptr); } // If the key's reference_count = 0, delete it immediately. otherwise, mark it expired, and delete in release @@ -128,14 +128,70 @@ private: size_t max_weight = 0; size_t max_element_size = 0; + /// Represents pending insertion attempt. struct InsertToken { + explicit InsertToken(LRUResourceCache & cache_) : cache(cache_) { } + std::mutex mutex; - MappedPtr value; - size_t reference_count = 0; + bool cleaned_up = false; /// Protected by the token mutex + MappedPtr value; /// Protected by the token mutex + + LRUResourceCache & cache; + size_t refcount = 0; /// Protected by the cache mutex }; - using InsertTokens = std::unordered_map; - InsertTokens insert_tokens; + + using InsertTokenById = std::unordered_map, HashFunction>; + + /// This class is responsible for removing used insert tokens from the insert_tokens map. + /// Among several concurrent threads the first successful one is responsible for removal. But if they all + /// fail, then the last one is responsible. + struct InsertTokenHolder + { + const Key * key = nullptr; + std::shared_ptr token; + bool cleaned_up = false; + + InsertTokenHolder() = default; + + void + acquire(const Key * key_, const std::shared_ptr & token_, [[maybe_unused]] std::lock_guard & cache_lock) + { + key = key_; + token = token_; + ++token->refcount; + } + + void cleanup([[maybe_unused]] std::lock_guard & token_lock, [[maybe_unused]] std::lock_guard & cache_lock) + { + token->cache.insert_tokens.erase(*key); + token->cleaned_up = true; + cleaned_up = true; + } + + ~InsertTokenHolder() + { + if (!token) + return; + + if (cleaned_up) + return; + + std::lock_guard token_lock(token->mutex); + + if (token->cleaned_up) + return; + + std::lock_guard cache_lock(token->cache.mutex); + + --token->refcount; + if (token->refcount == 0) + cleanup(token_lock, cache_lock); + } + }; + + friend struct InsertTokenHolder; + InsertTokenById insert_tokens; WeightFunction weight_function; std::atomic hits{0}; std::atomic misses{0}; @@ -146,7 +202,7 @@ private: template MappedPtr getImpl(const Key & key, LoadFunc && load_func) { - InsertToken * insert_token = nullptr; + InsertTokenHolder token_holder; { std::lock_guard lock(mutex); auto it = cells.find(key); @@ -169,31 +225,41 @@ private: } } misses++; - insert_token = acquireInsertToken(key); + auto & token = insert_tokens[key]; + if (!token) + token = std::make_shared(*this); + token_holder.acquire(&key, token, lock); } + + auto * token = token_holder.token.get(); + std::lock_guard token_lock(token->mutex); + token_holder.cleaned_up = token->cleaned_up; + + if (!token->value) + token->value = load_func(); + + std::lock_guard lock(mutex); + auto token_it = insert_tokens.find(key); Cell * cell_ptr = nullptr; + if (token_it != insert_tokens.end() && token_it->second.get() == token) { - std::lock_guard lock(insert_token->mutex); - if (!insert_token->value) + cell_ptr = set(key, token->value); + } + else + { + auto cell_it = cells.find(key); + if (cell_it != cells.end() && !cell_it->second.expired) { - insert_token->value = load_func(); - std::lock_guard cell_lock(mutex); - cell_ptr = set(key, insert_token->value); - if (cell_ptr) - { - cell_ptr->reference_count += 1; - } - else - { - insert_token->value = nullptr; - } + cell_ptr = &cell_it->second; } } - std::lock_guard lock(mutex); - releaseInsertToken(key); + if (!token->cleaned_up) + token_holder.cleanup(token_lock, lock); + if (cell_ptr) { + cell_ptr->reference_count++; return cell_ptr->value; } return nullptr; From e6528ca0eff25af7a2af102deaf6c6ac53bbecb0 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 30 Dec 2021 20:38:58 +0800 Subject: [PATCH 27/28] fix a bug in LRUCache::remove() --- src/Common/LRUCache.h | 1 + src/Common/tests/gtest_lru_resource_cache.cpp | 32 +++++++++---------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/src/Common/LRUCache.h b/src/Common/LRUCache.h index 1058003a327..480a03ab399 100644 --- a/src/Common/LRUCache.h +++ b/src/Common/LRUCache.h @@ -72,6 +72,7 @@ public: return; auto & cell = it->second; current_size -= cell.size; + queue.erase(cell.queue_iterator); cells.erase(it); } diff --git a/src/Common/tests/gtest_lru_resource_cache.cpp b/src/Common/tests/gtest_lru_resource_cache.cpp index fcdd2902522..f88eded531e 100644 --- a/src/Common/tests/gtest_lru_resource_cache.cpp +++ b/src/Common/tests/gtest_lru_resource_cache.cpp @@ -35,8 +35,8 @@ TEST(LRUResourceCache, remove) auto n = mcache.size(); ASSERT_TRUE(n == 1); - holder0 = nullptr; - holder1 = nullptr; + holder0.reset(); + holder1.reset(); n = mcache.size(); ASSERT_TRUE(n == 0); } @@ -53,10 +53,10 @@ TEST(LRUResourceCache, evictOnWweight) int x = 2; auto load_int = [&] { return std::make_shared(x); }; auto holder1 = mcache.getOrSet(1, load_int); - holder1 = nullptr; + holder1.reset(); auto holder2 = mcache.getOrSet(2, load_int); - holder2 = nullptr; + holder2.reset(); x = 3; auto holder3 = mcache.getOrSet(3, load_int); @@ -82,13 +82,13 @@ TEST(LRUResourceCache, evictOnWeightV2) int x = 2; auto load_int = [&] { return std::make_shared(x); }; auto holder1 = mcache.getOrSet(1, load_int); - holder1 = nullptr; + holder1.reset(); auto holder2 = mcache.getOrSet(2, load_int); - holder2 = nullptr; + holder2.reset(); holder1 = mcache.get(1); - holder1 = nullptr; + holder1.reset(); x = 3; auto holder3 = mcache.getOrSet(3, load_int); @@ -114,13 +114,13 @@ TEST(LRUResourceCache, evictOnWeightV3) int x = 2; auto load_int = [&] { return std::make_shared(x); }; auto holder1 = mcache.getOrSet(1, load_int); - holder1 = nullptr; + holder1.reset(); auto holder2 = mcache.getOrSet(2, load_int); - holder2 = nullptr; + holder2.reset(); holder1 = mcache.getOrSet(1, load_int); - holder1 = nullptr; + holder1.reset(); x = 3; auto holder3 = mcache.getOrSet(3, load_int); @@ -146,10 +146,10 @@ TEST(LRUResourceCache, evictOnSize) int x = 2; auto load_int = [&] { return std::make_shared(x); }; auto holder1 = mcache.getOrSet(1, load_int); - holder1 = nullptr; + holder1.reset(); auto holder2 = mcache.getOrSet(2, load_int); - holder2 = nullptr; + holder2.reset(); x = 3; auto holder3 = mcache.getOrSet(3, load_int); @@ -177,10 +177,10 @@ TEST(LRUResourceCache, notEvictUsedElement) auto holder1 = mcache.getOrSet(1, load_int); auto holder2 = mcache.getOrSet(2, load_int); - holder2 = nullptr; + holder2.reset(); auto holder3 = mcache.getOrSet(3, load_int); - holder3 = nullptr; + holder3.reset(); x = 3; auto holder4 = mcache.getOrSet(4, load_int); @@ -231,7 +231,7 @@ TEST(LRUResourceCache, dupGet) int x = 2; auto load_int = [&] { return std::make_shared(x); }; auto holder1 = mcache.getOrSet(1, load_int); - holder1 = nullptr; + holder1.reset(); x = 11; holder1 = mcache.getOrSet(1, load_int); ASSERT_TRUE(holder1 != nullptr); @@ -255,7 +255,7 @@ TEST(LRUResourceCache, reGet) mcache.tryRemove(1); x = 11; - holder1 = nullptr; + holder1.reset(); holder1 = mcache.getOrSet(1, load_int); ASSERT_TRUE(holder1 != nullptr); From 9c32723dcf4c141883017e148a6f6f4b015aebf4 Mon Sep 17 00:00:00 2001 From: lgbo-ustc Date: Thu, 30 Dec 2021 21:48:54 +0800 Subject: [PATCH 28/28] fix a bug in LRUResourceCache::getImpl() --- src/Common/LRUResourceCache.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Common/LRUResourceCache.h b/src/Common/LRUResourceCache.h index 76280497310..b872d649a57 100644 --- a/src/Common/LRUResourceCache.h +++ b/src/Common/LRUResourceCache.h @@ -259,6 +259,7 @@ private: if (cell_ptr) { + queue.splice(queue.end(), queue, cell_ptr->queue_iterator); cell_ptr->reference_count++; return cell_ptr->value; }