diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index 8e092bdf8e4..5768e744f94 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -600,13 +600,13 @@ void LocalServer::processConfig() String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", ""); size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", 0); if (uncompressed_cache_size) - global_context->setUncompressedCache(uncompressed_cache_size, uncompressed_cache_policy); + global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size); /// Size of cache for marks (index of MergeTree family of tables). String mark_cache_policy = config().getString("mark_cache_policy", ""); size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120); if (mark_cache_size) - global_context->setMarkCache(mark_cache_size, mark_cache_policy); + global_context->setMarkCache(mark_cache_policy, mark_cache_size); /// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled. size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0); diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index 711dfb3820a..5d172aa4f82 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -1456,7 +1456,7 @@ try LOG_INFO(log, "Uncompressed cache size was lowered to {} because the system has low amount of memory", formatReadableSizeWithBinarySuffix(uncompressed_cache_size)); } - global_context->setUncompressedCache(uncompressed_cache_size, uncompressed_cache_policy); + global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size); /// Load global settings from default_profile and system_profile. global_context->setDefaultProfiles(config()); @@ -1481,7 +1481,7 @@ try LOG_INFO(log, "Mark cache size was lowered to {} because the system has low amount of memory", formatReadableSizeWithBinarySuffix(mark_cache_size)); } - global_context->setMarkCache(mark_cache_size, mark_cache_policy); + global_context->setMarkCache(mark_cache_policy, mark_cache_size); if (server_settings.index_uncompressed_cache_size) global_context->setIndexUncompressedCache(server_settings.index_uncompressed_cache_size); diff --git a/src/Common/CacheBase.h b/src/Common/CacheBase.h index 8145bdf95b5..4ae313d7ecf 100644 --- a/src/Common/CacheBase.h +++ b/src/Common/CacheBase.h @@ -27,39 +27,55 @@ namespace ErrorCodes /// (default policy evicts entries which are not used for a long time). /// WeightFunction is a functor that takes Mapped as a parameter and returns "weight" (approximate size) /// of that value. -/// Cache starts to evict entries when their total weight exceeds max_size. +/// Cache starts to evict entries when their total weight exceeds max_size_in_bytes. /// Value weight should not change after insertion. -template , typename WeightFunction = TrivialWeightFunction> +template , typename WeightFunction = EqualWeightFunction> class CacheBase { -public: - using Key = TKey; - using Mapped = TMapped; - using MappedPtr = std::shared_ptr; +private: + using CachePolicy = ICachePolicy; - explicit CacheBase(size_t max_size, size_t max_elements_size = 0, String cache_policy_name = "", double size_ratio = 0.5) +public: + using Key = typename CachePolicy::Key; + using Mapped = typename CachePolicy::Mapped; + using MappedPtr = typename CachePolicy::MappedPtr; + using KeyMapped = typename CachePolicy::KeyMapped; + + /// Use this ctor if you don't care about the internal cache policy. + explicit CacheBase(size_t max_size_in_bytes, size_t max_count = 0, double size_ratio = 0.5) + : CacheBase("SLRU", max_size_in_bytes, max_count, size_ratio) + { + } + + /// Use this ctor if you want the user to configure the cache policy via some setting. Supports only general-purpose policies LRU and SLRU. + explicit CacheBase(std::string_view cache_policy_name, size_t max_size_in_bytes, size_t max_count = 0, double size_ratio = 0.5) { auto on_weight_loss_function = [&](size_t weight_loss) { onRemoveOverflowWeightLoss(weight_loss); }; + static constexpr std::string_view default_cache_policy = "SLRU"; + if (cache_policy_name.empty()) - cache_policy_name = default_cache_policy_name; + cache_policy_name = default_cache_policy; if (cache_policy_name == "LRU") { using LRUPolicy = LRUCachePolicy; - cache_policy = std::make_unique(max_size, max_elements_size, on_weight_loss_function); + cache_policy = std::make_unique(max_size_in_bytes, max_count, on_weight_loss_function); } else if (cache_policy_name == "SLRU") { using SLRUPolicy = SLRUCachePolicy; - cache_policy = std::make_unique(max_size, max_elements_size, size_ratio, on_weight_loss_function); + cache_policy = std::make_unique(max_size_in_bytes, max_count, size_ratio, on_weight_loss_function); } else - { - throw Exception(ErrorCodes::BAD_ARGUMENTS, "Undeclared cache policy name: {}", cache_policy_name); - } + throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown cache policy name: {}", cache_policy_name); } + /// Use this ctor to provide an arbitrary cache policy. + explicit CacheBase(std::unique_ptr> cache_policy_) + : cache_policy(std::move(cache_policy_)) + {} + MappedPtr get(const Key & key) { std::lock_guard lock(mutex); @@ -68,7 +84,17 @@ public: ++hits; else ++misses; + return res; + } + std::optional getWithKey(const Key & key) + { + std::lock_guard lock(mutex); + auto res = cache_policy->getWithKey(key, lock); + if (res.has_value()) + ++hits; + else + ++misses; return res; } @@ -147,6 +173,12 @@ public: out_misses = misses; } + std::vector dump() const + { + std::lock_guard lock(mutex); + return cache_policy->dump(); + } + void reset() { std::lock_guard lock(mutex); @@ -175,9 +207,21 @@ public: } size_t maxSize() const - TSA_NO_THREAD_SAFETY_ANALYSIS // disabled because max_size of cache_policy is a constant parameter { - return cache_policy->maxSize(); + std::lock_guard lock(mutex); + return cache_policy->maxSize(lock); + } + + void setMaxCount(size_t max_count) + { + std::lock_guard lock(mutex); + return cache_policy->setMaxCount(max_count, lock); + } + + void setMaxSize(size_t max_size_in_bytes) + { + std::lock_guard lock(mutex); + return cache_policy->setMaxSize(max_size_in_bytes, lock); } virtual ~CacheBase() = default; @@ -186,12 +230,8 @@ protected: mutable std::mutex mutex; private: - using CachePolicy = ICachePolicy; - std::unique_ptr cache_policy TSA_GUARDED_BY(mutex); - inline static const String default_cache_policy_name = "SLRU"; - std::atomic hits{0}; std::atomic misses{0}; diff --git a/src/Common/ICachePolicy.h b/src/Common/ICachePolicy.h index 4e5916f125e..dca82095af1 100644 --- a/src/Common/ICachePolicy.h +++ b/src/Common/ICachePolicy.h @@ -1,13 +1,21 @@ #pragma once +#include + #include #include #include namespace DB { + +namespace ErrorCodes +{ + extern const int NOT_IMPLEMENTED; +} + template -struct TrivialWeightFunction +struct EqualWeightFunction { size_t operator()(const T &) const { @@ -15,7 +23,7 @@ struct TrivialWeightFunction } }; -template , typename WeightFunction = TrivialWeightFunction> +template , typename WeightFunction = EqualWeightFunction> class ICachePolicy { public: @@ -24,19 +32,33 @@ public: using MappedPtr = std::shared_ptr; using OnWeightLossFunction = std::function; + struct KeyMapped + { + Key key; + MappedPtr mapped; + }; + virtual size_t weight(std::lock_guard & /* cache_lock */) const = 0; virtual size_t count(std::lock_guard & /* cache_lock */) const = 0; - virtual size_t maxSize() const = 0; + virtual size_t maxSize(std::lock_guard& /* cache_lock */) const = 0; - virtual void reset(std::lock_guard & /* cache_lock */) = 0; - virtual void remove(const Key & key, std::lock_guard & /* cache_lock */) = 0; + virtual void setMaxCount(size_t /*max_count*/, std::lock_guard & /* cache_lock */) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for cache policy"); } + virtual void setMaxSize(size_t /*max_size_in_bytes*/, std::lock_guard & /* cache_lock */) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for cache policy"); } + + /// HashFunction usually hashes the entire key and the found key will be equal the provided key. In such cases, use get(). It is also + /// possible to store other, non-hashed data in the key. In that case, the found key is potentially different from the provided key. + /// Then use getWithKey() to also return the found key including it's non-hashed data. virtual MappedPtr get(const Key & key, std::lock_guard & /* cache_lock */) = 0; + virtual std::optional getWithKey(const Key &, std::lock_guard & /*cache_lock*/) = 0; + virtual void set(const Key & key, const MappedPtr & mapped, std::lock_guard & /* cache_lock */) = 0; - virtual ~ICachePolicy() = default; + virtual void remove(const Key & key, std::lock_guard & /* cache_lock */) = 0; -protected: - OnWeightLossFunction on_weight_loss_function = [](size_t) {}; + virtual void reset(std::lock_guard & /* cache_lock */) = 0; + virtual std::vector dump() const = 0; + + virtual ~ICachePolicy() = default; }; } diff --git a/src/Common/LRUCachePolicy.h b/src/Common/LRUCachePolicy.h index b6c0ef0d3ef..4aee2135af7 100644 --- a/src/Common/LRUCachePolicy.h +++ b/src/Common/LRUCachePolicy.h @@ -12,33 +12,32 @@ namespace DB /// Cache policy LRU evicts entries which are not used for a long time. /// WeightFunction is a functor that takes Mapped as a parameter and returns "weight" (approximate size) /// of that value. -/// Cache starts to evict entries when their total weight exceeds max_size. +/// Cache starts to evict entries when their total weight exceeds max_size_in_bytes. /// Value weight should not change after insertion. /// To work with the thread-safe implementation of this class use a class "CacheBase" with first parameter "LRU" /// and next parameters in the same order as in the constructor of the current class. -template , typename WeightFunction = TrivialWeightFunction> -class LRUCachePolicy : public ICachePolicy +template , typename WeightFunction = EqualWeightFunction> +class LRUCachePolicy : public ICachePolicy { public: - using Key = TKey; - using Mapped = TMapped; - using MappedPtr = std::shared_ptr; - - using Base = ICachePolicy; + using Base = ICachePolicy; + using typename Base::MappedPtr; + using typename Base::KeyMapped; using typename Base::OnWeightLossFunction; - /** Initialize LRUCachePolicy with max_size and max_elements_size. - * max_elements_size == 0 means no elements size restrictions. + /** Initialize LRUCachePolicy with max_size_in_bytes and max_count. + * max_count == 0 means no elements size restrictions. */ - explicit LRUCachePolicy(size_t max_size_, size_t max_elements_size_ = 0, OnWeightLossFunction on_weight_loss_function_ = {}) - : max_size(std::max(1uz, max_size_)), max_elements_size(max_elements_size_) + LRUCachePolicy(size_t max_size_in_bytes_, size_t max_count_, OnWeightLossFunction on_weight_loss_function_) + : max_size_in_bytes(std::max(1uz, max_size_in_bytes_)) + , max_count(max_count_) + , on_weight_loss_function(on_weight_loss_function_) { - Base::on_weight_loss_function = on_weight_loss_function_; } size_t weight(std::lock_guard & /* cache_lock */) const override { - return current_size; + return current_size_in_bytes; } size_t count(std::lock_guard & /* cache_lock */) const override @@ -46,16 +45,16 @@ public: return cells.size(); } - size_t maxSize() const override + size_t maxSize(std::lock_guard & /* cache_lock */) const override { - return max_size; + return max_size_in_bytes; } void reset(std::lock_guard & /* cache_lock */) override { queue.clear(); cells.clear(); - current_size = 0; + current_size_in_bytes = 0; } void remove(const Key & key, std::lock_guard & /* cache_lock */) override @@ -64,7 +63,7 @@ public: if (it == cells.end()) return; auto & cell = it->second; - current_size -= cell.size; + current_size_in_bytes -= cell.size; queue.erase(cell.queue_iterator); cells.erase(it); } @@ -73,9 +72,7 @@ public: { auto it = cells.find(key); if (it == cells.end()) - { - return MappedPtr(); - } + return {}; Cell & cell = it->second; @@ -85,6 +82,20 @@ public: return cell.value; } + std::optional getWithKey(const Key & key, std::lock_guard & /*cache_lock*/) override + { + auto it = cells.find(key); + if (it == cells.end()) + return std::nullopt; + + Cell & cell = it->second; + + /// Move the key to the end of the queue. The iterator remains valid. + queue.splice(queue.end(), queue, cell.queue_iterator); + + return std::make_optional({it->first, cell.value}); + } + void set(const Key & key, const MappedPtr & mapped, std::lock_guard & /* cache_lock */) override { auto [it, inserted] = cells.emplace(std::piecewise_construct, @@ -107,18 +118,26 @@ public: } else { - current_size -= cell.size; + current_size_in_bytes -= cell.size; queue.splice(queue.end(), queue, cell.queue_iterator); } cell.value = mapped; cell.size = cell.value ? weight_function(*cell.value) : 0; - current_size += cell.size; + current_size_in_bytes += cell.size; removeOverflow(); } -protected: + std::vector dump() const override + { + std::vector res; + for (const auto & [key, cell] : cells) + res.push_back({key, cell.value}); + return res; + } + +private: using LRUQueue = std::list; using LRUQueueIterator = typename LRUQueue::iterator; @@ -136,18 +155,19 @@ protected: Cells cells; /// Total weight of values. - size_t current_size = 0; - const size_t max_size; - const size_t max_elements_size; + size_t current_size_in_bytes = 0; + const size_t max_size_in_bytes; + const size_t max_count; WeightFunction weight_function; + OnWeightLossFunction on_weight_loss_function; void removeOverflow() { size_t current_weight_lost = 0; size_t queue_size = cells.size(); - while ((current_size > max_size || (max_elements_size != 0 && queue_size > max_elements_size)) && (queue_size > 0)) + while ((current_size_in_bytes > max_size_in_bytes || (max_count != 0 && queue_size > max_count)) && (queue_size > 0)) { const Key & key = queue.front(); @@ -160,7 +180,7 @@ protected: const auto & cell = it->second; - current_size -= cell.size; + current_size_in_bytes -= cell.size; current_weight_lost += cell.size; cells.erase(it); @@ -168,9 +188,9 @@ protected: --queue_size; } - Base::on_weight_loss_function(current_weight_lost); + on_weight_loss_function(current_weight_lost); - if (current_size > (1ull << 63)) + if (current_size_in_bytes > (1ull << 63)) { LOG_ERROR(&Poco::Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it."); abort(); diff --git a/src/Common/SLRUCachePolicy.h b/src/Common/SLRUCachePolicy.h index e1d72aa630a..e36bca83c61 100644 --- a/src/Common/SLRUCachePolicy.h +++ b/src/Common/SLRUCachePolicy.h @@ -14,37 +14,35 @@ namespace DB /// this policy protects entries which were used more then once from a sequential scan. /// WeightFunction is a functor that takes Mapped as a parameter and returns "weight" (approximate size) /// of that value. -/// Cache starts to evict entries when their total weight exceeds max_size. +/// Cache starts to evict entries when their total weight exceeds max_size_in_bytes. /// Value weight should not change after insertion. /// To work with the thread-safe implementation of this class use a class "CacheBase" with first parameter "SLRU" /// and next parameters in the same order as in the constructor of the current class. -template , typename WeightFunction = TrivialWeightFunction> -class SLRUCachePolicy : public ICachePolicy +template , typename WeightFunction = EqualWeightFunction> +class SLRUCachePolicy : public ICachePolicy { public: - using Key = TKey; - using Mapped = TMapped; - using MappedPtr = std::shared_ptr; - - using Base = ICachePolicy; + using Base = ICachePolicy; + using typename Base::MappedPtr; + using typename Base::KeyMapped; using typename Base::OnWeightLossFunction; - /** Initialize SLRUCachePolicy with max_size and max_protected_size. + /** Initialize SLRUCachePolicy with max_size_in_bytes and max_protected_size. * max_protected_size shows how many of the most frequently used entries will not be evicted after a sequential scan. * max_protected_size == 0 means that the default protected size is equal to half of the total max size. */ /// TODO: construct from special struct with cache policy parameters (also with max_protected_size). - SLRUCachePolicy(size_t max_size_, size_t max_elements_size_ = 0, double size_ratio = 0.5, OnWeightLossFunction on_weight_loss_function_ = {}) - : max_protected_size(static_cast(max_size_ * std::min(1.0, size_ratio))) - , max_size(max_size_) - , max_elements_size(max_elements_size_) - { - Base::on_weight_loss_function = on_weight_loss_function_; - } + SLRUCachePolicy(size_t max_size_in_bytes_, size_t max_count_, double size_ratio, OnWeightLossFunction on_weight_loss_function_) + : max_protected_size(static_cast(max_size_in_bytes_ * std::min(1.0, size_ratio))) + , max_size_in_bytes(max_size_in_bytes_) + , max_count(max_count_) + , on_weight_loss_function(on_weight_loss_function_) + { + } size_t weight(std::lock_guard & /* cache_lock */) const override { - return current_size; + return current_size_in_bytes; } size_t count(std::lock_guard & /* cache_lock */) const override @@ -52,9 +50,9 @@ public: return cells.size(); } - size_t maxSize() const override + size_t maxSize(std::lock_guard & /* cache_lock */) const override { - return max_size; + return max_size_in_bytes; } void reset(std::lock_guard & /* cache_lock */) override @@ -62,7 +60,7 @@ public: cells.clear(); probationary_queue.clear(); protected_queue.clear(); - current_size = 0; + current_size_in_bytes = 0; current_protected_size = 0; } @@ -72,7 +70,7 @@ public: if (it == cells.end()) return; auto & cell = it->second; - current_size -= cell.size; + current_size_in_bytes -= cell.size; if (cell.is_protected) { current_protected_size -= cell.size; @@ -86,16 +84,12 @@ public: { auto it = cells.find(key); if (it == cells.end()) - { - return MappedPtr(); - } + return {}; Cell & cell = it->second; if (cell.is_protected) - { protected_queue.splice(protected_queue.end(), protected_queue, cell.queue_iterator); - } else { cell.is_protected = true; @@ -107,6 +101,27 @@ public: return cell.value; } + std::optional getWithKey(const Key & key, std::lock_guard & /*cache_lock*/) override + { + auto it = cells.find(key); + if (it == cells.end()) + return std::nullopt; + + Cell & cell = it->second; + + if (cell.is_protected) + protected_queue.splice(protected_queue.end(), protected_queue, cell.queue_iterator); + else + { + cell.is_protected = true; + current_protected_size += cell.size; + protected_queue.splice(protected_queue.end(), probationary_queue, cell.queue_iterator); + removeOverflow(protected_queue, max_protected_size, current_protected_size, /*is_protected=*/true); + } + + return std::make_optional({it->first, cell.value}); + } + void set(const Key & key, const MappedPtr & mapped, std::lock_guard & /* cache_lock */) override { auto [it, inserted] = cells.emplace(std::piecewise_construct, @@ -129,7 +144,7 @@ public: } else { - current_size -= cell.size; + current_size_in_bytes -= cell.size; if (cell.is_protected) { current_protected_size -= cell.size; @@ -144,14 +159,22 @@ public: cell.value = mapped; cell.size = cell.value ? weight_function(*cell.value) : 0; - current_size += cell.size; + current_size_in_bytes += cell.size; current_protected_size += cell.is_protected ? cell.size : 0; removeOverflow(protected_queue, max_protected_size, current_protected_size, /*is_protected=*/true); - removeOverflow(probationary_queue, max_size, current_size, /*is_protected=*/false); + removeOverflow(probationary_queue, max_size_in_bytes, current_size_in_bytes, /*is_protected=*/false); } -protected: + std::vector dump() const override + { + std::vector res; + for (const auto & [key, cell] : cells) + res.push_back({key, cell.value}); + return res; + } + +private: using SLRUQueue = std::list; using SLRUQueueIterator = typename SLRUQueue::iterator; @@ -171,12 +194,13 @@ protected: Cells cells; size_t current_protected_size = 0; - size_t current_size = 0; + size_t current_size_in_bytes = 0; const size_t max_protected_size; - const size_t max_size; - const size_t max_elements_size; + const size_t max_size_in_bytes; + const size_t max_count; WeightFunction weight_function; + OnWeightLossFunction on_weight_loss_function; void removeOverflow(SLRUQueue & queue, const size_t max_weight_size, size_t & current_weight_size, bool is_protected) { @@ -188,11 +212,11 @@ protected: { /// Check if after remove all elements from probationary part there will be no more than max elements /// in protected queue and weight of all protected elements will be less then max protected weight. - /// It's not possible to check only cells.size() > max_elements_size + /// It's not possible to check only cells.size() > max_count /// because protected elements move to probationary part and still remain in cache. need_remove = [&]() { - return ((max_elements_size != 0 && cells.size() - probationary_queue.size() > max_elements_size) + return ((max_count != 0 && cells.size() - probationary_queue.size() > max_count) || (current_weight_size > max_weight_size)) && (queue_size > 0); }; } @@ -200,7 +224,7 @@ protected: { need_remove = [&]() { - return ((max_elements_size != 0 && cells.size() > max_elements_size) + return ((max_count != 0 && cells.size() > max_count) || (current_weight_size > max_weight_size)) && (queue_size > 0); }; } @@ -236,11 +260,9 @@ protected: } if (!is_protected) - { - Base::on_weight_loss_function(current_weight_lost); - } + on_weight_loss_function(current_weight_lost); - if (current_size > (1ull << 63)) + if (current_size_in_bytes > (1ull << 63)) { LOG_ERROR(&Poco::Logger::get("SLRUCache"), "SLRUCache became inconsistent. There must be a bug in it."); abort(); diff --git a/src/Common/TTLCachePolicy.h b/src/Common/TTLCachePolicy.h new file mode 100644 index 00000000000..fc6367b807f --- /dev/null +++ b/src/Common/TTLCachePolicy.h @@ -0,0 +1,148 @@ +#pragma once + +#include + +#include + +namespace DB +{ + +/// TTLCachePolicy evicts entries for which IsStaleFunction returns true. +/// The cache size (in bytes and number of entries) can be changed at runtime. It is expected to set both sizes explicitly after construction. +template +class TTLCachePolicy : public ICachePolicy +{ +public: + using Base = ICachePolicy; + using typename Base::MappedPtr; + using typename Base::KeyMapped; + using typename Base::OnWeightLossFunction; + + TTLCachePolicy() + : max_size_in_bytes(0) + , max_count(0) + { + } + + size_t weight(std::lock_guard & /* cache_lock */) const override + { + return size_in_bytes; + } + + size_t count(std::lock_guard & /* cache_lock */) const override + { + return cache.size(); + } + + size_t maxSize(std::lock_guard & /* cache_lock */) const override + { + return max_size_in_bytes; + } + + void setMaxCount(size_t max_count_, std::lock_guard & /* cache_lock */) override + { + /// lazy behavior: the cache only shrinks upon the next insert + max_count = max_count_; + } + + void setMaxSize(size_t max_size_in_bytes_, std::lock_guard & /* cache_lock */) override + { + /// lazy behavior: the cache only shrinks upon the next insert + max_size_in_bytes = max_size_in_bytes_; + } + + void reset(std::lock_guard & /* cache_lock */) override + { + cache.clear(); + } + + void remove(const Key & key, std::lock_guard & /* cache_lock */) override + { + auto it = cache.find(key); + if (it == cache.end()) + return; + size_in_bytes -= weight_function(*it->second); + cache.erase(it); + } + + MappedPtr get(const Key & key, std::lock_guard & /* cache_lock */) override + { + auto it = cache.find(key); + if (it == cache.end()) + return {}; + return it->second; + } + + std::optional getWithKey(const Key & key, std::lock_guard & /* cache_lock */) override + { + auto it = cache.find(key); + if (it == cache.end()) + return std::nullopt; + return std::make_optional({it->first, it->second}); + } + + /// Evicts on a best-effort basis. If there are too many non-stale entries, the new entry may not be cached at all! + void set(const Key & key, const MappedPtr & mapped, std::lock_guard & /* cache_lock */) override + { + chassert(mapped.get()); + + const size_t entry_size_in_bytes = weight_function(*mapped); + + auto sufficient_space_in_cache = [&]() + { + return (size_in_bytes + entry_size_in_bytes <= max_size_in_bytes) && (cache.size() + 1 <= max_count); + }; + + if (!sufficient_space_in_cache()) + { + /// Remove stale entries + for (auto it = cache.begin(); it != cache.end();) + if (is_stale_function(it->first)) + { + size_in_bytes -= weight_function(*it->second); + it = cache.erase(it); + } + else + ++it; + } + + if (sufficient_space_in_cache()) + { + /// Insert or replace key + if (auto it = cache.find(key); it != cache.end()) + { + size_in_bytes -= weight_function(*it->second); + cache.erase(it); // stupid bug: (*) doesn't replace existing entries (likely due to custom hash function), need to erase explicitly + } + + cache[key] = std::move(mapped); // (*) + size_in_bytes += entry_size_in_bytes; + } + } + + std::vector dump() const override + { + std::vector res; + for (const auto & [key, mapped] : cache) + res.push_back({key, mapped}); + return res; + } + +private: + using Cache = std::unordered_map; + Cache cache; + + /// TODO To speed up removal of stale entries, we could also add another container sorted on expiry times which maps keys to iterators + /// into the cache. To insert an entry, add it to the cache + add the iterator to the sorted container. To remove stale entries, do a + /// binary search on the sorted container and erase all left of the found key. + + size_t size_in_bytes = 0; + size_t max_size_in_bytes; + size_t max_count; + + WeightFunction weight_function; + IsStaleFunction is_stale_function; + /// TODO support OnWeightLossFunction callback +}; + +} diff --git a/src/Common/tests/gtest_lru_cache.cpp b/src/Common/tests/gtest_lru_cache.cpp index f74d1eb9464..1185dd58e5e 100644 --- a/src/Common/tests/gtest_lru_cache.cpp +++ b/src/Common/tests/gtest_lru_cache.cpp @@ -6,7 +6,7 @@ TEST(LRUCache, set) { using SimpleCacheBase = DB::CacheBase; - auto lru_cache = SimpleCacheBase(/*max_size*/ 10, /*max_elements_size*/ 10, "LRU"); + auto lru_cache = SimpleCacheBase("LRU", /*max_size_in_bytes*/ 10, /*max_count*/ 10); lru_cache.set(1, std::make_shared(2)); lru_cache.set(2, std::make_shared(3)); @@ -19,7 +19,7 @@ TEST(LRUCache, set) TEST(LRUCache, update) { using SimpleCacheBase = DB::CacheBase; - auto lru_cache = SimpleCacheBase(/*max_size*/ 10, /*max_elements_size*/ 10, "LRU"); + auto lru_cache = SimpleCacheBase("LRU", /*max_size_in_bytes*/ 10, /*max_count*/ 10); lru_cache.set(1, std::make_shared(2)); lru_cache.set(1, std::make_shared(3)); auto val = lru_cache.get(1); @@ -30,7 +30,7 @@ TEST(LRUCache, update) TEST(LRUCache, get) { using SimpleCacheBase = DB::CacheBase; - auto lru_cache = SimpleCacheBase(/*max_size*/ 10, /*max_elements_size*/ 10, "LRU"); + auto lru_cache = SimpleCacheBase("LRU", /*max_size_in_bytes*/ 10, /*max_count*/ 10); lru_cache.set(1, std::make_shared(2)); lru_cache.set(2, std::make_shared(3)); SimpleCacheBase::MappedPtr value = lru_cache.get(1); @@ -50,7 +50,7 @@ struct ValueWeight TEST(LRUCache, evictOnSize) { using SimpleCacheBase = DB::CacheBase; - auto lru_cache = SimpleCacheBase(/*max_size*/ 20, /*max_elements_size*/ 3, "LRU"); + auto lru_cache = SimpleCacheBase("LRU", /*max_size_in_bytes*/ 20, /*max_count*/ 3); lru_cache.set(1, std::make_shared(2)); lru_cache.set(2, std::make_shared(3)); lru_cache.set(3, std::make_shared(4)); @@ -66,7 +66,7 @@ TEST(LRUCache, evictOnSize) TEST(LRUCache, evictOnWeight) { using SimpleCacheBase = DB::CacheBase, ValueWeight>; - auto lru_cache = SimpleCacheBase(/*max_size*/ 10, /*max_elements_size*/ 10, "LRU"); + auto lru_cache = SimpleCacheBase("LRU", /*max_size_in_bytes*/ 10, /*max_count*/ 10); lru_cache.set(1, std::make_shared(2)); lru_cache.set(2, std::make_shared(3)); lru_cache.set(3, std::make_shared(4)); @@ -87,7 +87,7 @@ TEST(LRUCache, evictOnWeight) TEST(LRUCache, getOrSet) { using SimpleCacheBase = DB::CacheBase, ValueWeight>; - auto lru_cache = SimpleCacheBase(/*max_size*/ 10, /*max_elements_size*/ 10, "LRU"); + auto lru_cache = SimpleCacheBase("LRU", /*max_size_in_bytes*/ 10, /*max_count*/ 10); size_t x = 10; auto load_func = [&] { return std::make_shared(x); }; auto [value, loaded] = lru_cache.getOrSet(1, load_func); diff --git a/src/Common/tests/gtest_slru_cahce.cpp b/src/Common/tests/gtest_slru_cache.cpp similarity index 81% rename from src/Common/tests/gtest_slru_cahce.cpp rename to src/Common/tests/gtest_slru_cache.cpp index 66df0dbec77..52549592f0e 100644 --- a/src/Common/tests/gtest_slru_cahce.cpp +++ b/src/Common/tests/gtest_slru_cache.cpp @@ -6,7 +6,7 @@ TEST(SLRUCache, set) { using SimpleCacheBase = DB::CacheBase; - auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5); + auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5); slru_cache.set(1, std::make_shared(2)); slru_cache.set(2, std::make_shared(3)); @@ -19,7 +19,7 @@ TEST(SLRUCache, set) TEST(SLRUCache, update) { using SimpleCacheBase = DB::CacheBase; - auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5); + auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5); slru_cache.set(1, std::make_shared(2)); slru_cache.set(1, std::make_shared(3)); @@ -31,7 +31,7 @@ TEST(SLRUCache, update) TEST(SLRUCache, get) { using SimpleCacheBase = DB::CacheBase; - auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5); + auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5); slru_cache.set(1, std::make_shared(2)); slru_cache.set(2, std::make_shared(3)); @@ -47,7 +47,7 @@ TEST(SLRUCache, get) TEST(SLRUCache, remove) { using SimpleCacheBase = DB::CacheBase; - auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5); + auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5); slru_cache.set(1, std::make_shared(2)); slru_cache.set(2, std::make_shared(3)); @@ -63,7 +63,7 @@ TEST(SLRUCache, remove) TEST(SLRUCache, removeFromProtected) { using SimpleCacheBase = DB::CacheBase; - auto slru_cache = SimpleCacheBase(/*max_size=*/2, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5); + auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/2, /*max_count=*/0, /*size_ratio*/0.5); slru_cache.set(1, std::make_shared(2)); slru_cache.set(1, std::make_shared(3)); @@ -96,7 +96,7 @@ TEST(SLRUCache, removeFromProtected) TEST(SLRUCache, reset) { using SimpleCacheBase = DB::CacheBase; - auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5); + auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5); slru_cache.set(1, std::make_shared(2)); slru_cache.set(2, std::make_shared(3)); @@ -119,7 +119,7 @@ struct ValueWeight TEST(SLRUCache, evictOnElements) { using SimpleCacheBase = DB::CacheBase, ValueWeight>; - auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/1, "SLRU", /*size_ratio*/0.5); + auto slru_cache = SimpleCacheBase(/*max_size_in_bytes=*/10, /*max_count=*/1, /*size_ratio*/0.5); slru_cache.set(1, std::make_shared(2)); slru_cache.set(2, std::make_shared(3)); @@ -140,7 +140,7 @@ TEST(SLRUCache, evictOnElements) TEST(SLRUCache, evictOnWeight) { using SimpleCacheBase = DB::CacheBase, ValueWeight>; - auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5); + auto slru_cache = SimpleCacheBase(/*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5); slru_cache.set(1, std::make_shared(2)); slru_cache.set(2, std::make_shared(3)); slru_cache.set(3, std::make_shared(4)); @@ -161,7 +161,7 @@ TEST(SLRUCache, evictOnWeight) TEST(SLRUCache, evictFromProtectedPart) { using SimpleCacheBase = DB::CacheBase, ValueWeight>; - auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5); + auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5); slru_cache.set(1, std::make_shared(2)); slru_cache.set(1, std::make_shared(2)); @@ -177,7 +177,7 @@ TEST(SLRUCache, evictFromProtectedPart) TEST(SLRUCache, evictStreamProtected) { using SimpleCacheBase = DB::CacheBase, ValueWeight>; - auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5); + auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5); slru_cache.set(1, std::make_shared(2)); slru_cache.set(1, std::make_shared(2)); @@ -201,7 +201,7 @@ TEST(SLRUCache, evictStreamProtected) TEST(SLRUCache, getOrSet) { using SimpleCacheBase = DB::CacheBase, ValueWeight>; - auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5); + auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5); size_t x = 5; auto load_func = [&] { return std::make_shared(x); }; auto [value, loaded] = slru_cache.getOrSet(1, load_func); diff --git a/src/IO/UncompressedCache.h b/src/IO/UncompressedCache.h index 3d1c907d364..2e654b27ed7 100644 --- a/src/IO/UncompressedCache.h +++ b/src/IO/UncompressedCache.h @@ -42,8 +42,11 @@ private: using Base = CacheBase; public: - explicit UncompressedCache(size_t max_size_in_bytes, const String & uncompressed_cache_policy = "") - : Base(max_size_in_bytes, 0, uncompressed_cache_policy) {} + explicit UncompressedCache(size_t max_size_in_bytes) + : Base(max_size_in_bytes) {} + + UncompressedCache(const String & uncompressed_cache_policy, size_t max_size_in_bytes) + : Base(uncompressed_cache_policy, max_size_in_bytes) {} /// Calculate key from path to file and offset. static UInt128 hash(const String & path_to_file, size_t offset) diff --git a/src/Interpreters/Cache/QueryCache.cpp b/src/Interpreters/Cache/QueryCache.cpp index b0c8766e505..ce2373a8af9 100644 --- a/src/Interpreters/Cache/QueryCache.cpp +++ b/src/Interpreters/Cache/QueryCache.cpp @@ -10,6 +10,7 @@ #include #include #include +#include #include #include /// chassert @@ -152,43 +153,32 @@ size_t QueryCache::KeyHasher::operator()(const Key & key) const return res; } -size_t QueryCache::QueryResult::sizeInBytes() const +size_t QueryCache::QueryResultWeight::operator()(const QueryResult & chunks) const { size_t res = 0; - for (const auto & chunk : *chunks) + for (const auto & chunk : chunks) res += chunk.allocatedBytes(); return res; -}; +} -namespace -{ - -auto is_stale = [](const QueryCache::Key & key) +bool QueryCache::IsStale::operator()(const Key & key) const { return (key.expires_at < std::chrono::system_clock::now()); }; -} - -QueryCache::Writer::Writer(std::mutex & mutex_, Cache & cache_, const Key & key_, - size_t & cache_size_in_bytes_, size_t max_cache_size_in_bytes_, - size_t max_cache_entries_, +QueryCache::Writer::Writer(Cache & cache_, const Key & key_, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_, std::chrono::milliseconds min_query_runtime_) - : mutex(mutex_) - , cache(cache_) + : cache(cache_) , key(key_) - , cache_size_in_bytes(cache_size_in_bytes_) - , max_cache_size_in_bytes(max_cache_size_in_bytes_) - , max_cache_entries(max_cache_entries_) , max_entry_size_in_bytes(max_entry_size_in_bytes_) , max_entry_size_in_rows(max_entry_size_in_rows_) , min_query_runtime(min_query_runtime_) { - if (auto it = cache.find(key); it != cache.end() && !is_stale(it->first)) + if (auto entry = cache.getWithKey(key); entry.has_value() && !IsStale()(entry->key)) { skip_insert = true; /// Key already contained in cache and did not expire yet --> don't replace it - LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (non-stale entry found), query: {}", key.queryStringFromAst()); + LOG_TRACE(&Poco::Logger::get("QueryCache"), "Skipped insert (non-stale entry found), query: {}", key.queryStringFromAst()); } } @@ -197,18 +187,20 @@ void QueryCache::Writer::buffer(Chunk && partial_query_result) if (skip_insert) return; - auto & chunks = query_result.chunks; + std::lock_guard lock(mutex); - chunks->emplace_back(std::move(partial_query_result)); + auto & chunks = *query_result; - new_entry_size_in_bytes += chunks->back().allocatedBytes(); - new_entry_size_in_rows += chunks->back().getNumRows(); + chunks.emplace_back(std::move(partial_query_result)); + + new_entry_size_in_bytes += chunks.back().allocatedBytes(); + new_entry_size_in_rows += chunks.back().getNumRows(); if ((new_entry_size_in_bytes > max_entry_size_in_bytes) || (new_entry_size_in_rows > max_entry_size_in_rows)) { - chunks->clear(); /// eagerly free some space + chunks.clear(); /// eagerly free some space skip_insert = true; - LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (query result too big), new_entry_size_in_bytes: {} ({}), new_entry_size_in_rows: {} ({}), query: {}", new_entry_size_in_bytes, max_entry_size_in_bytes, new_entry_size_in_rows, max_entry_size_in_rows, key.queryStringFromAst()); + LOG_TRACE(&Poco::Logger::get("QueryCache"), "Skipped insert (query result too big), new_entry_size_in_bytes: {} ({}), new_entry_size_in_rows: {} ({}), query: {}", new_entry_size_in_bytes, max_entry_size_in_bytes, new_entry_size_in_rows, max_entry_size_in_rows, key.queryStringFromAst()); } } @@ -217,81 +209,47 @@ void QueryCache::Writer::finalizeWrite() if (skip_insert) return; - if (std::chrono::duration_cast(std::chrono::system_clock::now() - query_start_time) < min_query_runtime) - { - LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (query not expensive enough), query: {}", key.queryStringFromAst()); - return; - } - std::lock_guard lock(mutex); - if (auto it = cache.find(key); it != cache.end() && !is_stale(it->first)) + if (std::chrono::duration_cast(std::chrono::system_clock::now() - query_start_time) < min_query_runtime) { - /// same check as in ctor because a parallel Writer could have inserted the current key in the meantime - LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (non-stale entry found), query: {}", key.queryStringFromAst()); + LOG_TRACE(&Poco::Logger::get("QueryCache"), "Skipped insert (query not expensive enough), query: {}", key.queryStringFromAst()); return; } - auto sufficient_space_in_cache = [this]() TSA_REQUIRES(mutex) + if (auto entry = cache.getWithKey(key); entry.has_value() && !IsStale()(entry->key)) { - return (cache_size_in_bytes + new_entry_size_in_bytes <= max_cache_size_in_bytes) && (cache.size() + 1 <= max_cache_entries); - }; - - if (!sufficient_space_in_cache()) - { - size_t removed_items = 0; - /// Remove stale entries - for (auto it = cache.begin(); it != cache.end();) - if (is_stale(it->first)) - { - cache_size_in_bytes -= it->second.sizeInBytes(); - it = cache.erase(it); - ++removed_items; - } - else - ++it; - LOG_TRACE(&Poco::Logger::get("QueryCache"), "Removed {} stale entries", removed_items); + /// same check as in ctor because a parallel Writer could have inserted the current key in the meantime + LOG_TRACE(&Poco::Logger::get("QueryCache"), "Skipped insert (non-stale entry found), query: {}", key.queryStringFromAst()); + return; } - if (!sufficient_space_in_cache()) - LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (cache has insufficient space), query: {}", key.queryStringFromAst()); - else - { - //// Insert or replace key - cache_size_in_bytes += query_result.sizeInBytes(); - if (auto it = cache.find(key); it != cache.end()) - cache_size_in_bytes -= it->second.sizeInBytes(); // key replacement - - cache[key] = std::move(query_result); - LOG_TRACE(&Poco::Logger::get("QueryCache"), "Stored result of query {}", key.queryStringFromAst()); - } + cache.set(key, query_result); } -QueryCache::Reader::Reader(const Cache & cache_, const Key & key, size_t & cache_size_in_bytes_, const std::lock_guard &) +QueryCache::Reader::Reader(Cache & cache_, const Key & key, const std::lock_guard &) { - auto it = cache_.find(key); + auto entry = cache_.getWithKey(key); - if (it == cache_.end()) + if (!entry.has_value()) { LOG_TRACE(&Poco::Logger::get("QueryCache"), "No entry found for query {}", key.queryStringFromAst()); return; } - if (it->first.username.has_value() && it->first.username != key.username) + if (entry->key.username.has_value() && entry->key.username != key.username) { LOG_TRACE(&Poco::Logger::get("QueryCache"), "Inaccessible entry found for query {}", key.queryStringFromAst()); return; } - if (is_stale(it->first)) + if (IsStale()(entry->key)) { - cache_size_in_bytes_ -= it->second.sizeInBytes(); - const_cast(cache_).erase(it); - LOG_TRACE(&Poco::Logger::get("QueryCache"), "Stale entry found and removed for query {}", key.queryStringFromAst()); + LOG_TRACE(&Poco::Logger::get("QueryCache"), "Stale entry found for query {}", key.queryStringFromAst()); return; } - pipe = Pipe(std::make_shared(it->first.header, it->second.chunks)); + pipe = Pipe(std::make_shared(entry->key.header, entry->mapped)); LOG_TRACE(&Poco::Logger::get("QueryCache"), "Entry found for query {}", key.queryStringFromAst()); } @@ -316,19 +274,19 @@ Pipe && QueryCache::Reader::getPipe() QueryCache::Reader QueryCache::createReader(const Key & key) { std::lock_guard lock(mutex); - return Reader(cache, key, cache_size_in_bytes, lock); + return Reader(cache, key, lock); } QueryCache::Writer QueryCache::createWriter(const Key & key, std::chrono::milliseconds min_query_runtime) { std::lock_guard lock(mutex); - return Writer(mutex, cache, key, cache_size_in_bytes, max_cache_size_in_bytes, max_cache_entries, max_cache_entry_size_in_bytes, max_cache_entry_size_in_rows, min_query_runtime); + return Writer(cache, key, max_entry_size_in_bytes, max_entry_size_in_rows, min_query_runtime); } void QueryCache::reset() { + cache.reset(); std::lock_guard lock(mutex); - cache.clear(); times_executed.clear(); cache_size_in_bytes = 0; } @@ -344,13 +302,28 @@ size_t QueryCache::recordQueryRun(const Key & key) return times; } +std::vector QueryCache::dump() const +{ + return cache.dump(); +} + +QueryCache::QueryCache() + : cache(std::make_unique>()) +{ +} + void QueryCache::updateConfiguration(const Poco::Util::AbstractConfiguration & config) { std::lock_guard lock(mutex); - max_cache_size_in_bytes = config.getUInt64("query_cache.size", 1_GiB); - max_cache_entries = config.getUInt64("query_cache.max_entries", 1024); - max_cache_entry_size_in_bytes = config.getUInt64("query_cache.max_entry_size", 1_MiB); - max_cache_entry_size_in_rows = config.getUInt64("query_cache.max_entry_rows", 30'000'000); + + size_t max_size_in_bytes = config.getUInt64("query_cache.size", 1_GiB); + cache.setMaxSize(max_size_in_bytes); + + size_t max_entries = config.getUInt64("query_cache.max_entries", 1024); + cache.setMaxCount(max_entries); + + max_entry_size_in_bytes = config.getUInt64("query_cache.max_entry_size", 1_MiB); + max_entry_size_in_rows = config.getUInt64("query_cache.max_entry_rows", 30'000'000); } } diff --git a/src/Interpreters/Cache/QueryCache.h b/src/Interpreters/Cache/QueryCache.h index 66477d77dcb..763e797ac07 100644 --- a/src/Interpreters/Cache/QueryCache.h +++ b/src/Interpreters/Cache/QueryCache.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -41,7 +42,7 @@ public: /// Result metadata for constructing the pipe. const Block header; - /// Std::nullopt means that the associated entry can be read by other users. In general, sharing is a bad idea: First, it is + /// std::nullopt means that the associated entry can be read by other users. In general, sharing is a bad idea: First, it is /// unlikely that different users pose the same queries. Second, sharing potentially breaches security. E.g. User A should not be /// able to bypass row policies on some table by running the same queries as user B for whom no row policies exist. const std::optional username; @@ -57,15 +58,7 @@ public: String queryStringFromAst() const; }; - struct QueryResult - { - std::shared_ptr chunks = std::make_shared(); - size_t sizeInBytes() const; - - /// Notes: 1. For performance reasons, we cache the original result chunks as-is (no concatenation during cache insert or lookup). - /// 2. Ref-counting (shared_ptr) ensures that eviction of an entry does not affect queries which still read from the cache. - /// (this can also be achieved by copying the chunks during lookup but that would be under the cache lock --> too slow) - }; + using QueryResult = Chunks; private: struct KeyHasher @@ -73,8 +66,18 @@ private: size_t operator()(const Key & key) const; }; + struct QueryResultWeight + { + size_t operator()(const QueryResult & chunks) const; + }; + + struct IsStale + { + bool operator()(const Key & key) const; + }; + /// query --> query result - using Cache = std::unordered_map; + using Cache = CacheBase; /// query --> query execution count using TimesExecuted = std::unordered_map; @@ -97,24 +100,19 @@ public: void buffer(Chunk && partial_query_result); void finalizeWrite(); private: - std::mutex & mutex; - Cache & cache TSA_GUARDED_BY(mutex); + std::mutex mutex; + Cache & cache; const Key key; - size_t & cache_size_in_bytes TSA_GUARDED_BY(mutex); - const size_t max_cache_size_in_bytes; - const size_t max_cache_entries; - size_t new_entry_size_in_bytes = 0; + size_t new_entry_size_in_bytes TSA_GUARDED_BY(mutex) = 0; const size_t max_entry_size_in_bytes; - size_t new_entry_size_in_rows = 0; + size_t new_entry_size_in_rows TSA_GUARDED_BY(mutex) = 0; const size_t max_entry_size_in_rows; const std::chrono::time_point query_start_time = std::chrono::system_clock::now(); /// Writer construction and finalizeWrite() coincide with query start/end const std::chrono::milliseconds min_query_runtime; - QueryResult query_result; + std::shared_ptr query_result TSA_GUARDED_BY(mutex) = std::make_shared(); std::atomic skip_insert = false; - Writer(std::mutex & mutex_, Cache & cache_, const Key & key_, - size_t & cache_size_in_bytes_, size_t max_cache_size_in_bytes_, - size_t max_cache_entries_, + Writer(Cache & cache_, const Key & key_, size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_, std::chrono::milliseconds min_query_runtime_); @@ -128,11 +126,13 @@ public: bool hasCacheEntryForKey() const; Pipe && getPipe(); /// must be called only if hasCacheEntryForKey() returns true private: - Reader(const Cache & cache_, const Key & key, size_t & cache_size_in_bytes_, const std::lock_guard &); + Reader(Cache & cache_, const Key & key, const std::lock_guard &); Pipe pipe; friend class QueryCache; /// for createReader() }; + QueryCache(); + void updateConfiguration(const Poco::Util::AbstractConfiguration & config); Reader createReader(const Key & key); @@ -143,23 +143,18 @@ public: /// Record new execution of query represented by key. Returns number of executions so far. size_t recordQueryRun(const Key & key); + /// For debugging and system tables + std::vector dump() const; + private: - /// Implementation note: The query result implements a custom caching mechanism and doesn't make use of CacheBase, unlike many other - /// internal caches in ClickHouse. The main reason is that we don't need standard CacheBase (S)LRU eviction as the expiry times - /// associated with cache entries provide a "natural" eviction criterion. As a future TODO, we could make an expiry-based eviction - /// policy and use that with CacheBase (e.g. see #23706) - /// TODO To speed up removal of stale entries, we could also add another container sorted on expiry times which maps keys to iterators - /// into the cache. To insert an entry, add it to the cache + add the iterator to the sorted container. To remove stale entries, do a - /// binary search on the sorted container and erase all left of the found key. + Cache cache; + mutable std::mutex mutex; - Cache cache TSA_GUARDED_BY(mutex); TimesExecuted times_executed TSA_GUARDED_BY(mutex); /// Cache configuration - size_t max_cache_size_in_bytes TSA_GUARDED_BY(mutex) = 0; - size_t max_cache_entries TSA_GUARDED_BY(mutex) = 0; - size_t max_cache_entry_size_in_bytes TSA_GUARDED_BY(mutex) = 0; - size_t max_cache_entry_size_in_rows TSA_GUARDED_BY(mutex) = 0; + size_t max_entry_size_in_bytes TSA_GUARDED_BY(mutex) = 0; + size_t max_entry_size_in_rows TSA_GUARDED_BY(mutex) = 0; size_t cache_size_in_bytes TSA_GUARDED_BY(mutex) = 0; /// Updated in each cache insert/delete diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index 2cfa55f0d87..4e882f3ab5b 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -1954,14 +1954,14 @@ QueryStatusPtr Context::getProcessListElement() const } -void Context::setUncompressedCache(size_t max_size_in_bytes, const String & uncompressed_cache_policy) +void Context::setUncompressedCache(const String & uncompressed_cache_policy, size_t max_size_in_bytes) { auto lock = getLock(); if (shared->uncompressed_cache) throw Exception(ErrorCodes::LOGICAL_ERROR, "Uncompressed cache has been already created."); - shared->uncompressed_cache = std::make_shared(max_size_in_bytes, uncompressed_cache_policy); + shared->uncompressed_cache = std::make_shared(uncompressed_cache_policy, max_size_in_bytes); } @@ -1980,14 +1980,14 @@ void Context::dropUncompressedCache() const } -void Context::setMarkCache(size_t cache_size_in_bytes, const String & mark_cache_policy) +void Context::setMarkCache(const String & mark_cache_policy, size_t cache_size_in_bytes) { auto lock = getLock(); if (shared->mark_cache) throw Exception(ErrorCodes::LOGICAL_ERROR, "Mark cache has been already created."); - shared->mark_cache = std::make_shared(cache_size_in_bytes, mark_cache_policy); + shared->mark_cache = std::make_shared(mark_cache_policy, cache_size_in_bytes); } MarkCachePtr Context::getMarkCache() const diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 67594a41459..bbfbd4defdc 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -861,12 +861,12 @@ public: void setSystemZooKeeperLogAfterInitializationIfNeeded(); /// Create a cache of uncompressed blocks of specified size. This can be done only once. - void setUncompressedCache(size_t max_size_in_bytes, const String & uncompressed_cache_policy); + void setUncompressedCache(const String & uncompressed_cache_policy, size_t max_size_in_bytes); std::shared_ptr getUncompressedCache() const; void dropUncompressedCache() const; /// Create a cache of marks of specified size. This can be done only once. - void setMarkCache(size_t cache_size_in_bytes, const String & mark_cache_policy); + void setMarkCache(const String & mark_cache_policy, size_t cache_size_in_bytes); std::shared_ptr getMarkCache() const; void dropMarkCache() const; ThreadPool & getLoadMarksThreadpool() const; diff --git a/src/Storages/MarkCache.h b/src/Storages/MarkCache.h index f9272b1f4bb..2b286ebb1e8 100644 --- a/src/Storages/MarkCache.h +++ b/src/Storages/MarkCache.h @@ -40,8 +40,11 @@ private: using Base = CacheBase; public: - explicit MarkCache(size_t max_size_in_bytes, const String & mark_cache_policy = "") - : Base(max_size_in_bytes, 0, mark_cache_policy) {} + explicit MarkCache(size_t max_size_in_bytes) + : Base(max_size_in_bytes) {} + + MarkCache(const String & mark_cache_policy, size_t max_size_in_bytes) + : Base(mark_cache_policy, max_size_in_bytes) {} /// Calculate key from path to file and offset. static UInt128 hash(const String & path_to_file) diff --git a/src/Storages/System/StorageSystemQueryCache.cpp b/src/Storages/System/StorageSystemQueryCache.cpp index 2de8e4594b9..2cbcc773ad6 100644 --- a/src/Storages/System/StorageSystemQueryCache.cpp +++ b/src/Storages/System/StorageSystemQueryCache.cpp @@ -33,11 +33,11 @@ void StorageSystemQueryCache::fillData(MutableColumns & res_columns, ContextPtr if (!query_cache) return; + std::vector content = query_cache->dump(); + const String & username = context->getUserName(); - std::lock_guard lock(query_cache->mutex); - - for (const auto & [key, result] : query_cache->cache) + for (const auto & [key, query_result] : content) { /// Showing other user's queries is considered a security risk if (key.username.has_value() && key.username != username) @@ -48,7 +48,7 @@ void StorageSystemQueryCache::fillData(MutableColumns & res_columns, ContextPtr res_columns[2]->insert(std::chrono::system_clock::to_time_t(key.expires_at)); res_columns[3]->insert(key.expires_at < std::chrono::system_clock::now()); res_columns[4]->insert(!key.username.has_value()); - res_columns[5]->insert(result.sizeInBytes()); + res_columns[5]->insert(QueryCache::QueryResultWeight()(*query_result)); } }