Merge pull request #47428 from ClickHouse/rs/qc-cachebase

Refactor query cache (make use of CacheBase)
This commit is contained in:
Robert Schulze 2023-03-20 20:49:41 +01:00 committed by GitHub
commit 91e1de2d59
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 474 additions and 248 deletions

View File

@ -600,13 +600,13 @@ void LocalServer::processConfig()
String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", "");
size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", 0);
if (uncompressed_cache_size)
global_context->setUncompressedCache(uncompressed_cache_size, uncompressed_cache_policy);
global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size);
/// Size of cache for marks (index of MergeTree family of tables).
String mark_cache_policy = config().getString("mark_cache_policy", "");
size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120);
if (mark_cache_size)
global_context->setMarkCache(mark_cache_size, mark_cache_policy);
global_context->setMarkCache(mark_cache_policy, mark_cache_size);
/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);

View File

@ -1456,7 +1456,7 @@ try
LOG_INFO(log, "Uncompressed cache size was lowered to {} because the system has low amount of memory",
formatReadableSizeWithBinarySuffix(uncompressed_cache_size));
}
global_context->setUncompressedCache(uncompressed_cache_size, uncompressed_cache_policy);
global_context->setUncompressedCache(uncompressed_cache_policy, uncompressed_cache_size);
/// Load global settings from default_profile and system_profile.
global_context->setDefaultProfiles(config());
@ -1481,7 +1481,7 @@ try
LOG_INFO(log, "Mark cache size was lowered to {} because the system has low amount of memory",
formatReadableSizeWithBinarySuffix(mark_cache_size));
}
global_context->setMarkCache(mark_cache_size, mark_cache_policy);
global_context->setMarkCache(mark_cache_policy, mark_cache_size);
if (server_settings.index_uncompressed_cache_size)
global_context->setIndexUncompressedCache(server_settings.index_uncompressed_cache_size);

View File

@ -27,39 +27,55 @@ namespace ErrorCodes
/// (default policy evicts entries which are not used for a long time).
/// WeightFunction is a functor that takes Mapped as a parameter and returns "weight" (approximate size)
/// of that value.
/// Cache starts to evict entries when their total weight exceeds max_size.
/// Cache starts to evict entries when their total weight exceeds max_size_in_bytes.
/// Value weight should not change after insertion.
template <typename TKey, typename TMapped, typename HashFunction = std::hash<TKey>, typename WeightFunction = TrivialWeightFunction<TMapped>>
template <typename TKey, typename TMapped, typename HashFunction = std::hash<TKey>, typename WeightFunction = EqualWeightFunction<TMapped>>
class CacheBase
{
public:
using Key = TKey;
using Mapped = TMapped;
using MappedPtr = std::shared_ptr<Mapped>;
private:
using CachePolicy = ICachePolicy<TKey, TMapped, HashFunction, WeightFunction>;
explicit CacheBase(size_t max_size, size_t max_elements_size = 0, String cache_policy_name = "", double size_ratio = 0.5)
public:
using Key = typename CachePolicy::Key;
using Mapped = typename CachePolicy::Mapped;
using MappedPtr = typename CachePolicy::MappedPtr;
using KeyMapped = typename CachePolicy::KeyMapped;
/// Use this ctor if you don't care about the internal cache policy.
explicit CacheBase(size_t max_size_in_bytes, size_t max_count = 0, double size_ratio = 0.5)
: CacheBase("SLRU", max_size_in_bytes, max_count, size_ratio)
{
}
/// Use this ctor if you want the user to configure the cache policy via some setting. Supports only general-purpose policies LRU and SLRU.
explicit CacheBase(std::string_view cache_policy_name, size_t max_size_in_bytes, size_t max_count = 0, double size_ratio = 0.5)
{
auto on_weight_loss_function = [&](size_t weight_loss) { onRemoveOverflowWeightLoss(weight_loss); };
static constexpr std::string_view default_cache_policy = "SLRU";
if (cache_policy_name.empty())
cache_policy_name = default_cache_policy_name;
cache_policy_name = default_cache_policy;
if (cache_policy_name == "LRU")
{
using LRUPolicy = LRUCachePolicy<TKey, TMapped, HashFunction, WeightFunction>;
cache_policy = std::make_unique<LRUPolicy>(max_size, max_elements_size, on_weight_loss_function);
cache_policy = std::make_unique<LRUPolicy>(max_size_in_bytes, max_count, on_weight_loss_function);
}
else if (cache_policy_name == "SLRU")
{
using SLRUPolicy = SLRUCachePolicy<TKey, TMapped, HashFunction, WeightFunction>;
cache_policy = std::make_unique<SLRUPolicy>(max_size, max_elements_size, size_ratio, on_weight_loss_function);
cache_policy = std::make_unique<SLRUPolicy>(max_size_in_bytes, max_count, size_ratio, on_weight_loss_function);
}
else
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Undeclared cache policy name: {}", cache_policy_name);
}
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown cache policy name: {}", cache_policy_name);
}
/// Use this ctor to provide an arbitrary cache policy.
explicit CacheBase(std::unique_ptr<ICachePolicy<TKey, TMapped, HashFunction, WeightFunction>> cache_policy_)
: cache_policy(std::move(cache_policy_))
{}
MappedPtr get(const Key & key)
{
std::lock_guard lock(mutex);
@ -68,7 +84,17 @@ public:
++hits;
else
++misses;
return res;
}
std::optional<KeyMapped> getWithKey(const Key & key)
{
std::lock_guard lock(mutex);
auto res = cache_policy->getWithKey(key, lock);
if (res.has_value())
++hits;
else
++misses;
return res;
}
@ -147,6 +173,12 @@ public:
out_misses = misses;
}
std::vector<KeyMapped> dump() const
{
std::lock_guard lock(mutex);
return cache_policy->dump();
}
void reset()
{
std::lock_guard lock(mutex);
@ -175,9 +207,21 @@ public:
}
size_t maxSize() const
TSA_NO_THREAD_SAFETY_ANALYSIS // disabled because max_size of cache_policy is a constant parameter
{
return cache_policy->maxSize();
std::lock_guard lock(mutex);
return cache_policy->maxSize(lock);
}
void setMaxCount(size_t max_count)
{
std::lock_guard lock(mutex);
return cache_policy->setMaxCount(max_count, lock);
}
void setMaxSize(size_t max_size_in_bytes)
{
std::lock_guard lock(mutex);
return cache_policy->setMaxSize(max_size_in_bytes, lock);
}
virtual ~CacheBase() = default;
@ -186,12 +230,8 @@ protected:
mutable std::mutex mutex;
private:
using CachePolicy = ICachePolicy<TKey, TMapped, HashFunction, WeightFunction>;
std::unique_ptr<CachePolicy> cache_policy TSA_GUARDED_BY(mutex);
inline static const String default_cache_policy_name = "SLRU";
std::atomic<size_t> hits{0};
std::atomic<size_t> misses{0};

View File

@ -1,13 +1,21 @@
#pragma once
#include <Common/Exception.h>
#include <functional>
#include <memory>
#include <mutex>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
template <typename T>
struct TrivialWeightFunction
struct EqualWeightFunction
{
size_t operator()(const T &) const
{
@ -15,7 +23,7 @@ struct TrivialWeightFunction
}
};
template <typename TKey, typename TMapped, typename HashFunction = std::hash<TKey>, typename WeightFunction = TrivialWeightFunction<TMapped>>
template <typename TKey, typename TMapped, typename HashFunction = std::hash<TKey>, typename WeightFunction = EqualWeightFunction<TMapped>>
class ICachePolicy
{
public:
@ -24,19 +32,33 @@ public:
using MappedPtr = std::shared_ptr<Mapped>;
using OnWeightLossFunction = std::function<void(size_t)>;
struct KeyMapped
{
Key key;
MappedPtr mapped;
};
virtual size_t weight(std::lock_guard<std::mutex> & /* cache_lock */) const = 0;
virtual size_t count(std::lock_guard<std::mutex> & /* cache_lock */) const = 0;
virtual size_t maxSize() const = 0;
virtual size_t maxSize(std::lock_guard<std::mutex>& /* cache_lock */) const = 0;
virtual void reset(std::lock_guard<std::mutex> & /* cache_lock */) = 0;
virtual void remove(const Key & key, std::lock_guard<std::mutex> & /* cache_lock */) = 0;
virtual void setMaxCount(size_t /*max_count*/, std::lock_guard<std::mutex> & /* cache_lock */) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for cache policy"); }
virtual void setMaxSize(size_t /*max_size_in_bytes*/, std::lock_guard<std::mutex> & /* cache_lock */) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented for cache policy"); }
/// HashFunction usually hashes the entire key and the found key will be equal the provided key. In such cases, use get(). It is also
/// possible to store other, non-hashed data in the key. In that case, the found key is potentially different from the provided key.
/// Then use getWithKey() to also return the found key including it's non-hashed data.
virtual MappedPtr get(const Key & key, std::lock_guard<std::mutex> & /* cache_lock */) = 0;
virtual std::optional<KeyMapped> getWithKey(const Key &, std::lock_guard<std::mutex> & /*cache_lock*/) = 0;
virtual void set(const Key & key, const MappedPtr & mapped, std::lock_guard<std::mutex> & /* cache_lock */) = 0;
virtual ~ICachePolicy() = default;
virtual void remove(const Key & key, std::lock_guard<std::mutex> & /* cache_lock */) = 0;
protected:
OnWeightLossFunction on_weight_loss_function = [](size_t) {};
virtual void reset(std::lock_guard<std::mutex> & /* cache_lock */) = 0;
virtual std::vector<KeyMapped> dump() const = 0;
virtual ~ICachePolicy() = default;
};
}

View File

@ -12,33 +12,32 @@ namespace DB
/// Cache policy LRU evicts entries which are not used for a long time.
/// WeightFunction is a functor that takes Mapped as a parameter and returns "weight" (approximate size)
/// of that value.
/// Cache starts to evict entries when their total weight exceeds max_size.
/// Cache starts to evict entries when their total weight exceeds max_size_in_bytes.
/// Value weight should not change after insertion.
/// To work with the thread-safe implementation of this class use a class "CacheBase" with first parameter "LRU"
/// and next parameters in the same order as in the constructor of the current class.
template <typename TKey, typename TMapped, typename HashFunction = std::hash<TKey>, typename WeightFunction = TrivialWeightFunction<TMapped>>
class LRUCachePolicy : public ICachePolicy<TKey, TMapped, HashFunction, WeightFunction>
template <typename Key, typename Mapped, typename HashFunction = std::hash<Key>, typename WeightFunction = EqualWeightFunction<Mapped>>
class LRUCachePolicy : public ICachePolicy<Key, Mapped, HashFunction, WeightFunction>
{
public:
using Key = TKey;
using Mapped = TMapped;
using MappedPtr = std::shared_ptr<Mapped>;
using Base = ICachePolicy<TKey, TMapped, HashFunction, WeightFunction>;
using Base = ICachePolicy<Key, Mapped, HashFunction, WeightFunction>;
using typename Base::MappedPtr;
using typename Base::KeyMapped;
using typename Base::OnWeightLossFunction;
/** Initialize LRUCachePolicy with max_size and max_elements_size.
* max_elements_size == 0 means no elements size restrictions.
/** Initialize LRUCachePolicy with max_size_in_bytes and max_count.
* max_count == 0 means no elements size restrictions.
*/
explicit LRUCachePolicy(size_t max_size_, size_t max_elements_size_ = 0, OnWeightLossFunction on_weight_loss_function_ = {})
: max_size(std::max(1uz, max_size_)), max_elements_size(max_elements_size_)
LRUCachePolicy(size_t max_size_in_bytes_, size_t max_count_, OnWeightLossFunction on_weight_loss_function_)
: max_size_in_bytes(std::max(1uz, max_size_in_bytes_))
, max_count(max_count_)
, on_weight_loss_function(on_weight_loss_function_)
{
Base::on_weight_loss_function = on_weight_loss_function_;
}
size_t weight(std::lock_guard<std::mutex> & /* cache_lock */) const override
{
return current_size;
return current_size_in_bytes;
}
size_t count(std::lock_guard<std::mutex> & /* cache_lock */) const override
@ -46,16 +45,16 @@ public:
return cells.size();
}
size_t maxSize() const override
size_t maxSize(std::lock_guard<std::mutex> & /* cache_lock */) const override
{
return max_size;
return max_size_in_bytes;
}
void reset(std::lock_guard<std::mutex> & /* cache_lock */) override
{
queue.clear();
cells.clear();
current_size = 0;
current_size_in_bytes = 0;
}
void remove(const Key & key, std::lock_guard<std::mutex> & /* cache_lock */) override
@ -64,7 +63,7 @@ public:
if (it == cells.end())
return;
auto & cell = it->second;
current_size -= cell.size;
current_size_in_bytes -= cell.size;
queue.erase(cell.queue_iterator);
cells.erase(it);
}
@ -73,9 +72,7 @@ public:
{
auto it = cells.find(key);
if (it == cells.end())
{
return MappedPtr();
}
return {};
Cell & cell = it->second;
@ -85,6 +82,20 @@ public:
return cell.value;
}
std::optional<KeyMapped> getWithKey(const Key & key, std::lock_guard<std::mutex> & /*cache_lock*/) override
{
auto it = cells.find(key);
if (it == cells.end())
return std::nullopt;
Cell & cell = it->second;
/// Move the key to the end of the queue. The iterator remains valid.
queue.splice(queue.end(), queue, cell.queue_iterator);
return std::make_optional<KeyMapped>({it->first, cell.value});
}
void set(const Key & key, const MappedPtr & mapped, std::lock_guard<std::mutex> & /* cache_lock */) override
{
auto [it, inserted] = cells.emplace(std::piecewise_construct,
@ -107,18 +118,26 @@ public:
}
else
{
current_size -= cell.size;
current_size_in_bytes -= cell.size;
queue.splice(queue.end(), queue, cell.queue_iterator);
}
cell.value = mapped;
cell.size = cell.value ? weight_function(*cell.value) : 0;
current_size += cell.size;
current_size_in_bytes += cell.size;
removeOverflow();
}
protected:
std::vector<KeyMapped> dump() const override
{
std::vector<KeyMapped> res;
for (const auto & [key, cell] : cells)
res.push_back({key, cell.value});
return res;
}
private:
using LRUQueue = std::list<Key>;
using LRUQueueIterator = typename LRUQueue::iterator;
@ -136,18 +155,19 @@ protected:
Cells cells;
/// Total weight of values.
size_t current_size = 0;
const size_t max_size;
const size_t max_elements_size;
size_t current_size_in_bytes = 0;
const size_t max_size_in_bytes;
const size_t max_count;
WeightFunction weight_function;
OnWeightLossFunction on_weight_loss_function;
void removeOverflow()
{
size_t current_weight_lost = 0;
size_t queue_size = cells.size();
while ((current_size > max_size || (max_elements_size != 0 && queue_size > max_elements_size)) && (queue_size > 0))
while ((current_size_in_bytes > max_size_in_bytes || (max_count != 0 && queue_size > max_count)) && (queue_size > 0))
{
const Key & key = queue.front();
@ -160,7 +180,7 @@ protected:
const auto & cell = it->second;
current_size -= cell.size;
current_size_in_bytes -= cell.size;
current_weight_lost += cell.size;
cells.erase(it);
@ -168,9 +188,9 @@ protected:
--queue_size;
}
Base::on_weight_loss_function(current_weight_lost);
on_weight_loss_function(current_weight_lost);
if (current_size > (1ull << 63))
if (current_size_in_bytes > (1ull << 63))
{
LOG_ERROR(&Poco::Logger::get("LRUCache"), "LRUCache became inconsistent. There must be a bug in it.");
abort();

View File

@ -14,37 +14,35 @@ namespace DB
/// this policy protects entries which were used more then once from a sequential scan.
/// WeightFunction is a functor that takes Mapped as a parameter and returns "weight" (approximate size)
/// of that value.
/// Cache starts to evict entries when their total weight exceeds max_size.
/// Cache starts to evict entries when their total weight exceeds max_size_in_bytes.
/// Value weight should not change after insertion.
/// To work with the thread-safe implementation of this class use a class "CacheBase" with first parameter "SLRU"
/// and next parameters in the same order as in the constructor of the current class.
template <typename TKey, typename TMapped, typename HashFunction = std::hash<TKey>, typename WeightFunction = TrivialWeightFunction<TMapped>>
class SLRUCachePolicy : public ICachePolicy<TKey, TMapped, HashFunction, WeightFunction>
template <typename Key, typename Mapped, typename HashFunction = std::hash<Key>, typename WeightFunction = EqualWeightFunction<Mapped>>
class SLRUCachePolicy : public ICachePolicy<Key, Mapped, HashFunction, WeightFunction>
{
public:
using Key = TKey;
using Mapped = TMapped;
using MappedPtr = std::shared_ptr<Mapped>;
using Base = ICachePolicy<TKey, TMapped, HashFunction, WeightFunction>;
using Base = ICachePolicy<Key, Mapped, HashFunction, WeightFunction>;
using typename Base::MappedPtr;
using typename Base::KeyMapped;
using typename Base::OnWeightLossFunction;
/** Initialize SLRUCachePolicy with max_size and max_protected_size.
/** Initialize SLRUCachePolicy with max_size_in_bytes and max_protected_size.
* max_protected_size shows how many of the most frequently used entries will not be evicted after a sequential scan.
* max_protected_size == 0 means that the default protected size is equal to half of the total max size.
*/
/// TODO: construct from special struct with cache policy parameters (also with max_protected_size).
SLRUCachePolicy(size_t max_size_, size_t max_elements_size_ = 0, double size_ratio = 0.5, OnWeightLossFunction on_weight_loss_function_ = {})
: max_protected_size(static_cast<size_t>(max_size_ * std::min(1.0, size_ratio)))
, max_size(max_size_)
, max_elements_size(max_elements_size_)
{
Base::on_weight_loss_function = on_weight_loss_function_;
}
SLRUCachePolicy(size_t max_size_in_bytes_, size_t max_count_, double size_ratio, OnWeightLossFunction on_weight_loss_function_)
: max_protected_size(static_cast<size_t>(max_size_in_bytes_ * std::min(1.0, size_ratio)))
, max_size_in_bytes(max_size_in_bytes_)
, max_count(max_count_)
, on_weight_loss_function(on_weight_loss_function_)
{
}
size_t weight(std::lock_guard<std::mutex> & /* cache_lock */) const override
{
return current_size;
return current_size_in_bytes;
}
size_t count(std::lock_guard<std::mutex> & /* cache_lock */) const override
@ -52,9 +50,9 @@ public:
return cells.size();
}
size_t maxSize() const override
size_t maxSize(std::lock_guard<std::mutex> & /* cache_lock */) const override
{
return max_size;
return max_size_in_bytes;
}
void reset(std::lock_guard<std::mutex> & /* cache_lock */) override
@ -62,7 +60,7 @@ public:
cells.clear();
probationary_queue.clear();
protected_queue.clear();
current_size = 0;
current_size_in_bytes = 0;
current_protected_size = 0;
}
@ -72,7 +70,7 @@ public:
if (it == cells.end())
return;
auto & cell = it->second;
current_size -= cell.size;
current_size_in_bytes -= cell.size;
if (cell.is_protected)
{
current_protected_size -= cell.size;
@ -86,16 +84,12 @@ public:
{
auto it = cells.find(key);
if (it == cells.end())
{
return MappedPtr();
}
return {};
Cell & cell = it->second;
if (cell.is_protected)
{
protected_queue.splice(protected_queue.end(), protected_queue, cell.queue_iterator);
}
else
{
cell.is_protected = true;
@ -107,6 +101,27 @@ public:
return cell.value;
}
std::optional<KeyMapped> getWithKey(const Key & key, std::lock_guard<std::mutex> & /*cache_lock*/) override
{
auto it = cells.find(key);
if (it == cells.end())
return std::nullopt;
Cell & cell = it->second;
if (cell.is_protected)
protected_queue.splice(protected_queue.end(), protected_queue, cell.queue_iterator);
else
{
cell.is_protected = true;
current_protected_size += cell.size;
protected_queue.splice(protected_queue.end(), probationary_queue, cell.queue_iterator);
removeOverflow(protected_queue, max_protected_size, current_protected_size, /*is_protected=*/true);
}
return std::make_optional<KeyMapped>({it->first, cell.value});
}
void set(const Key & key, const MappedPtr & mapped, std::lock_guard<std::mutex> & /* cache_lock */) override
{
auto [it, inserted] = cells.emplace(std::piecewise_construct,
@ -129,7 +144,7 @@ public:
}
else
{
current_size -= cell.size;
current_size_in_bytes -= cell.size;
if (cell.is_protected)
{
current_protected_size -= cell.size;
@ -144,14 +159,22 @@ public:
cell.value = mapped;
cell.size = cell.value ? weight_function(*cell.value) : 0;
current_size += cell.size;
current_size_in_bytes += cell.size;
current_protected_size += cell.is_protected ? cell.size : 0;
removeOverflow(protected_queue, max_protected_size, current_protected_size, /*is_protected=*/true);
removeOverflow(probationary_queue, max_size, current_size, /*is_protected=*/false);
removeOverflow(probationary_queue, max_size_in_bytes, current_size_in_bytes, /*is_protected=*/false);
}
protected:
std::vector<KeyMapped> dump() const override
{
std::vector<KeyMapped> res;
for (const auto & [key, cell] : cells)
res.push_back({key, cell.value});
return res;
}
private:
using SLRUQueue = std::list<Key>;
using SLRUQueueIterator = typename SLRUQueue::iterator;
@ -171,12 +194,13 @@ protected:
Cells cells;
size_t current_protected_size = 0;
size_t current_size = 0;
size_t current_size_in_bytes = 0;
const size_t max_protected_size;
const size_t max_size;
const size_t max_elements_size;
const size_t max_size_in_bytes;
const size_t max_count;
WeightFunction weight_function;
OnWeightLossFunction on_weight_loss_function;
void removeOverflow(SLRUQueue & queue, const size_t max_weight_size, size_t & current_weight_size, bool is_protected)
{
@ -188,11 +212,11 @@ protected:
{
/// Check if after remove all elements from probationary part there will be no more than max elements
/// in protected queue and weight of all protected elements will be less then max protected weight.
/// It's not possible to check only cells.size() > max_elements_size
/// It's not possible to check only cells.size() > max_count
/// because protected elements move to probationary part and still remain in cache.
need_remove = [&]()
{
return ((max_elements_size != 0 && cells.size() - probationary_queue.size() > max_elements_size)
return ((max_count != 0 && cells.size() - probationary_queue.size() > max_count)
|| (current_weight_size > max_weight_size)) && (queue_size > 0);
};
}
@ -200,7 +224,7 @@ protected:
{
need_remove = [&]()
{
return ((max_elements_size != 0 && cells.size() > max_elements_size)
return ((max_count != 0 && cells.size() > max_count)
|| (current_weight_size > max_weight_size)) && (queue_size > 0);
};
}
@ -236,11 +260,9 @@ protected:
}
if (!is_protected)
{
Base::on_weight_loss_function(current_weight_lost);
}
on_weight_loss_function(current_weight_lost);
if (current_size > (1ull << 63))
if (current_size_in_bytes > (1ull << 63))
{
LOG_ERROR(&Poco::Logger::get("SLRUCache"), "SLRUCache became inconsistent. There must be a bug in it.");
abort();

148
src/Common/TTLCachePolicy.h Normal file
View File

@ -0,0 +1,148 @@
#pragma once
#include <Common/ICachePolicy.h>
#include <unordered_map>
namespace DB
{
/// TTLCachePolicy evicts entries for which IsStaleFunction returns true.
/// The cache size (in bytes and number of entries) can be changed at runtime. It is expected to set both sizes explicitly after construction.
template <typename Key, typename Mapped, typename HashFunction, typename WeightFunction, typename IsStaleFunction>
class TTLCachePolicy : public ICachePolicy<Key, Mapped, HashFunction, WeightFunction>
{
public:
using Base = ICachePolicy<Key, Mapped, HashFunction, WeightFunction>;
using typename Base::MappedPtr;
using typename Base::KeyMapped;
using typename Base::OnWeightLossFunction;
TTLCachePolicy()
: max_size_in_bytes(0)
, max_count(0)
{
}
size_t weight(std::lock_guard<std::mutex> & /* cache_lock */) const override
{
return size_in_bytes;
}
size_t count(std::lock_guard<std::mutex> & /* cache_lock */) const override
{
return cache.size();
}
size_t maxSize(std::lock_guard<std::mutex> & /* cache_lock */) const override
{
return max_size_in_bytes;
}
void setMaxCount(size_t max_count_, std::lock_guard<std::mutex> & /* cache_lock */) override
{
/// lazy behavior: the cache only shrinks upon the next insert
max_count = max_count_;
}
void setMaxSize(size_t max_size_in_bytes_, std::lock_guard<std::mutex> & /* cache_lock */) override
{
/// lazy behavior: the cache only shrinks upon the next insert
max_size_in_bytes = max_size_in_bytes_;
}
void reset(std::lock_guard<std::mutex> & /* cache_lock */) override
{
cache.clear();
}
void remove(const Key & key, std::lock_guard<std::mutex> & /* cache_lock */) override
{
auto it = cache.find(key);
if (it == cache.end())
return;
size_in_bytes -= weight_function(*it->second);
cache.erase(it);
}
MappedPtr get(const Key & key, std::lock_guard<std::mutex> & /* cache_lock */) override
{
auto it = cache.find(key);
if (it == cache.end())
return {};
return it->second;
}
std::optional<KeyMapped> getWithKey(const Key & key, std::lock_guard<std::mutex> & /* cache_lock */) override
{
auto it = cache.find(key);
if (it == cache.end())
return std::nullopt;
return std::make_optional<KeyMapped>({it->first, it->second});
}
/// Evicts on a best-effort basis. If there are too many non-stale entries, the new entry may not be cached at all!
void set(const Key & key, const MappedPtr & mapped, std::lock_guard<std::mutex> & /* cache_lock */) override
{
chassert(mapped.get());
const size_t entry_size_in_bytes = weight_function(*mapped);
auto sufficient_space_in_cache = [&]()
{
return (size_in_bytes + entry_size_in_bytes <= max_size_in_bytes) && (cache.size() + 1 <= max_count);
};
if (!sufficient_space_in_cache())
{
/// Remove stale entries
for (auto it = cache.begin(); it != cache.end();)
if (is_stale_function(it->first))
{
size_in_bytes -= weight_function(*it->second);
it = cache.erase(it);
}
else
++it;
}
if (sufficient_space_in_cache())
{
/// Insert or replace key
if (auto it = cache.find(key); it != cache.end())
{
size_in_bytes -= weight_function(*it->second);
cache.erase(it); // stupid bug: (*) doesn't replace existing entries (likely due to custom hash function), need to erase explicitly
}
cache[key] = std::move(mapped); // (*)
size_in_bytes += entry_size_in_bytes;
}
}
std::vector<KeyMapped> dump() const override
{
std::vector<KeyMapped> res;
for (const auto & [key, mapped] : cache)
res.push_back({key, mapped});
return res;
}
private:
using Cache = std::unordered_map<Key, MappedPtr, HashFunction>;
Cache cache;
/// TODO To speed up removal of stale entries, we could also add another container sorted on expiry times which maps keys to iterators
/// into the cache. To insert an entry, add it to the cache + add the iterator to the sorted container. To remove stale entries, do a
/// binary search on the sorted container and erase all left of the found key.
size_t size_in_bytes = 0;
size_t max_size_in_bytes;
size_t max_count;
WeightFunction weight_function;
IsStaleFunction is_stale_function;
/// TODO support OnWeightLossFunction callback
};
}

View File

@ -6,7 +6,7 @@
TEST(LRUCache, set)
{
using SimpleCacheBase = DB::CacheBase<int, int>;
auto lru_cache = SimpleCacheBase(/*max_size*/ 10, /*max_elements_size*/ 10, "LRU");
auto lru_cache = SimpleCacheBase("LRU", /*max_size_in_bytes*/ 10, /*max_count*/ 10);
lru_cache.set(1, std::make_shared<int>(2));
lru_cache.set(2, std::make_shared<int>(3));
@ -19,7 +19,7 @@ TEST(LRUCache, set)
TEST(LRUCache, update)
{
using SimpleCacheBase = DB::CacheBase<int, int>;
auto lru_cache = SimpleCacheBase(/*max_size*/ 10, /*max_elements_size*/ 10, "LRU");
auto lru_cache = SimpleCacheBase("LRU", /*max_size_in_bytes*/ 10, /*max_count*/ 10);
lru_cache.set(1, std::make_shared<int>(2));
lru_cache.set(1, std::make_shared<int>(3));
auto val = lru_cache.get(1);
@ -30,7 +30,7 @@ TEST(LRUCache, update)
TEST(LRUCache, get)
{
using SimpleCacheBase = DB::CacheBase<int, int>;
auto lru_cache = SimpleCacheBase(/*max_size*/ 10, /*max_elements_size*/ 10, "LRU");
auto lru_cache = SimpleCacheBase("LRU", /*max_size_in_bytes*/ 10, /*max_count*/ 10);
lru_cache.set(1, std::make_shared<int>(2));
lru_cache.set(2, std::make_shared<int>(3));
SimpleCacheBase::MappedPtr value = lru_cache.get(1);
@ -50,7 +50,7 @@ struct ValueWeight
TEST(LRUCache, evictOnSize)
{
using SimpleCacheBase = DB::CacheBase<int, size_t>;
auto lru_cache = SimpleCacheBase(/*max_size*/ 20, /*max_elements_size*/ 3, "LRU");
auto lru_cache = SimpleCacheBase("LRU", /*max_size_in_bytes*/ 20, /*max_count*/ 3);
lru_cache.set(1, std::make_shared<size_t>(2));
lru_cache.set(2, std::make_shared<size_t>(3));
lru_cache.set(3, std::make_shared<size_t>(4));
@ -66,7 +66,7 @@ TEST(LRUCache, evictOnSize)
TEST(LRUCache, evictOnWeight)
{
using SimpleCacheBase = DB::CacheBase<int, size_t, std::hash<int>, ValueWeight>;
auto lru_cache = SimpleCacheBase(/*max_size*/ 10, /*max_elements_size*/ 10, "LRU");
auto lru_cache = SimpleCacheBase("LRU", /*max_size_in_bytes*/ 10, /*max_count*/ 10);
lru_cache.set(1, std::make_shared<size_t>(2));
lru_cache.set(2, std::make_shared<size_t>(3));
lru_cache.set(3, std::make_shared<size_t>(4));
@ -87,7 +87,7 @@ TEST(LRUCache, evictOnWeight)
TEST(LRUCache, getOrSet)
{
using SimpleCacheBase = DB::CacheBase<int, size_t, std::hash<int>, ValueWeight>;
auto lru_cache = SimpleCacheBase(/*max_size*/ 10, /*max_elements_size*/ 10, "LRU");
auto lru_cache = SimpleCacheBase("LRU", /*max_size_in_bytes*/ 10, /*max_count*/ 10);
size_t x = 10;
auto load_func = [&] { return std::make_shared<size_t>(x); };
auto [value, loaded] = lru_cache.getOrSet(1, load_func);

View File

@ -6,7 +6,7 @@
TEST(SLRUCache, set)
{
using SimpleCacheBase = DB::CacheBase<int, int>;
auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5);
auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5);
slru_cache.set(1, std::make_shared<int>(2));
slru_cache.set(2, std::make_shared<int>(3));
@ -19,7 +19,7 @@ TEST(SLRUCache, set)
TEST(SLRUCache, update)
{
using SimpleCacheBase = DB::CacheBase<int, int>;
auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5);
auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5);
slru_cache.set(1, std::make_shared<int>(2));
slru_cache.set(1, std::make_shared<int>(3));
@ -31,7 +31,7 @@ TEST(SLRUCache, update)
TEST(SLRUCache, get)
{
using SimpleCacheBase = DB::CacheBase<int, int>;
auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5);
auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5);
slru_cache.set(1, std::make_shared<int>(2));
slru_cache.set(2, std::make_shared<int>(3));
@ -47,7 +47,7 @@ TEST(SLRUCache, get)
TEST(SLRUCache, remove)
{
using SimpleCacheBase = DB::CacheBase<int, int>;
auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5);
auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5);
slru_cache.set(1, std::make_shared<int>(2));
slru_cache.set(2, std::make_shared<int>(3));
@ -63,7 +63,7 @@ TEST(SLRUCache, remove)
TEST(SLRUCache, removeFromProtected)
{
using SimpleCacheBase = DB::CacheBase<int, int>;
auto slru_cache = SimpleCacheBase(/*max_size=*/2, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5);
auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/2, /*max_count=*/0, /*size_ratio*/0.5);
slru_cache.set(1, std::make_shared<int>(2));
slru_cache.set(1, std::make_shared<int>(3));
@ -96,7 +96,7 @@ TEST(SLRUCache, removeFromProtected)
TEST(SLRUCache, reset)
{
using SimpleCacheBase = DB::CacheBase<int, int>;
auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5);
auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5);
slru_cache.set(1, std::make_shared<int>(2));
slru_cache.set(2, std::make_shared<int>(3));
@ -119,7 +119,7 @@ struct ValueWeight
TEST(SLRUCache, evictOnElements)
{
using SimpleCacheBase = DB::CacheBase<int, size_t, std::hash<int>, ValueWeight>;
auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/1, "SLRU", /*size_ratio*/0.5);
auto slru_cache = SimpleCacheBase(/*max_size_in_bytes=*/10, /*max_count=*/1, /*size_ratio*/0.5);
slru_cache.set(1, std::make_shared<size_t>(2));
slru_cache.set(2, std::make_shared<size_t>(3));
@ -140,7 +140,7 @@ TEST(SLRUCache, evictOnElements)
TEST(SLRUCache, evictOnWeight)
{
using SimpleCacheBase = DB::CacheBase<int, size_t, std::hash<int>, ValueWeight>;
auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5);
auto slru_cache = SimpleCacheBase(/*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5);
slru_cache.set(1, std::make_shared<size_t>(2));
slru_cache.set(2, std::make_shared<size_t>(3));
slru_cache.set(3, std::make_shared<size_t>(4));
@ -161,7 +161,7 @@ TEST(SLRUCache, evictOnWeight)
TEST(SLRUCache, evictFromProtectedPart)
{
using SimpleCacheBase = DB::CacheBase<int, size_t, std::hash<int>, ValueWeight>;
auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5);
auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5);
slru_cache.set(1, std::make_shared<size_t>(2));
slru_cache.set(1, std::make_shared<size_t>(2));
@ -177,7 +177,7 @@ TEST(SLRUCache, evictFromProtectedPart)
TEST(SLRUCache, evictStreamProtected)
{
using SimpleCacheBase = DB::CacheBase<int, size_t, std::hash<int>, ValueWeight>;
auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5);
auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5);
slru_cache.set(1, std::make_shared<size_t>(2));
slru_cache.set(1, std::make_shared<size_t>(2));
@ -201,7 +201,7 @@ TEST(SLRUCache, evictStreamProtected)
TEST(SLRUCache, getOrSet)
{
using SimpleCacheBase = DB::CacheBase<int, size_t, std::hash<int>, ValueWeight>;
auto slru_cache = SimpleCacheBase(/*max_size=*/10, /*max_elements_size=*/0, "SLRU", /*size_ratio*/0.5);
auto slru_cache = SimpleCacheBase("SLRU", /*max_size_in_bytes=*/10, /*max_count=*/0, /*size_ratio*/0.5);
size_t x = 5;
auto load_func = [&] { return std::make_shared<size_t>(x); };
auto [value, loaded] = slru_cache.getOrSet(1, load_func);

View File

@ -42,8 +42,11 @@ private:
using Base = CacheBase<UInt128, UncompressedCacheCell, UInt128TrivialHash, UncompressedSizeWeightFunction>;
public:
explicit UncompressedCache(size_t max_size_in_bytes, const String & uncompressed_cache_policy = "")
: Base(max_size_in_bytes, 0, uncompressed_cache_policy) {}
explicit UncompressedCache(size_t max_size_in_bytes)
: Base(max_size_in_bytes) {}
UncompressedCache(const String & uncompressed_cache_policy, size_t max_size_in_bytes)
: Base(uncompressed_cache_policy, max_size_in_bytes) {}
/// Calculate key from path to file and offset.
static UInt128 hash(const String & path_to_file, size_t offset)

View File

@ -10,6 +10,7 @@
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include <Common/TTLCachePolicy.h>
#include <Core/Settings.h>
#include <base/defines.h> /// chassert
@ -152,43 +153,32 @@ size_t QueryCache::KeyHasher::operator()(const Key & key) const
return res;
}
size_t QueryCache::QueryResult::sizeInBytes() const
size_t QueryCache::QueryResultWeight::operator()(const QueryResult & chunks) const
{
size_t res = 0;
for (const auto & chunk : *chunks)
for (const auto & chunk : chunks)
res += chunk.allocatedBytes();
return res;
};
}
namespace
{
auto is_stale = [](const QueryCache::Key & key)
bool QueryCache::IsStale::operator()(const Key & key) const
{
return (key.expires_at < std::chrono::system_clock::now());
};
}
QueryCache::Writer::Writer(std::mutex & mutex_, Cache & cache_, const Key & key_,
size_t & cache_size_in_bytes_, size_t max_cache_size_in_bytes_,
size_t max_cache_entries_,
QueryCache::Writer::Writer(Cache & cache_, const Key & key_,
size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_,
std::chrono::milliseconds min_query_runtime_)
: mutex(mutex_)
, cache(cache_)
: cache(cache_)
, key(key_)
, cache_size_in_bytes(cache_size_in_bytes_)
, max_cache_size_in_bytes(max_cache_size_in_bytes_)
, max_cache_entries(max_cache_entries_)
, max_entry_size_in_bytes(max_entry_size_in_bytes_)
, max_entry_size_in_rows(max_entry_size_in_rows_)
, min_query_runtime(min_query_runtime_)
{
if (auto it = cache.find(key); it != cache.end() && !is_stale(it->first))
if (auto entry = cache.getWithKey(key); entry.has_value() && !IsStale()(entry->key))
{
skip_insert = true; /// Key already contained in cache and did not expire yet --> don't replace it
LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (non-stale entry found), query: {}", key.queryStringFromAst());
LOG_TRACE(&Poco::Logger::get("QueryCache"), "Skipped insert (non-stale entry found), query: {}", key.queryStringFromAst());
}
}
@ -197,18 +187,20 @@ void QueryCache::Writer::buffer(Chunk && partial_query_result)
if (skip_insert)
return;
auto & chunks = query_result.chunks;
std::lock_guard lock(mutex);
chunks->emplace_back(std::move(partial_query_result));
auto & chunks = *query_result;
new_entry_size_in_bytes += chunks->back().allocatedBytes();
new_entry_size_in_rows += chunks->back().getNumRows();
chunks.emplace_back(std::move(partial_query_result));
new_entry_size_in_bytes += chunks.back().allocatedBytes();
new_entry_size_in_rows += chunks.back().getNumRows();
if ((new_entry_size_in_bytes > max_entry_size_in_bytes) || (new_entry_size_in_rows > max_entry_size_in_rows))
{
chunks->clear(); /// eagerly free some space
chunks.clear(); /// eagerly free some space
skip_insert = true;
LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (query result too big), new_entry_size_in_bytes: {} ({}), new_entry_size_in_rows: {} ({}), query: {}", new_entry_size_in_bytes, max_entry_size_in_bytes, new_entry_size_in_rows, max_entry_size_in_rows, key.queryStringFromAst());
LOG_TRACE(&Poco::Logger::get("QueryCache"), "Skipped insert (query result too big), new_entry_size_in_bytes: {} ({}), new_entry_size_in_rows: {} ({}), query: {}", new_entry_size_in_bytes, max_entry_size_in_bytes, new_entry_size_in_rows, max_entry_size_in_rows, key.queryStringFromAst());
}
}
@ -217,81 +209,47 @@ void QueryCache::Writer::finalizeWrite()
if (skip_insert)
return;
if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - query_start_time) < min_query_runtime)
{
LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (query not expensive enough), query: {}", key.queryStringFromAst());
return;
}
std::lock_guard lock(mutex);
if (auto it = cache.find(key); it != cache.end() && !is_stale(it->first))
if (std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - query_start_time) < min_query_runtime)
{
/// same check as in ctor because a parallel Writer could have inserted the current key in the meantime
LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (non-stale entry found), query: {}", key.queryStringFromAst());
LOG_TRACE(&Poco::Logger::get("QueryCache"), "Skipped insert (query not expensive enough), query: {}", key.queryStringFromAst());
return;
}
auto sufficient_space_in_cache = [this]() TSA_REQUIRES(mutex)
if (auto entry = cache.getWithKey(key); entry.has_value() && !IsStale()(entry->key))
{
return (cache_size_in_bytes + new_entry_size_in_bytes <= max_cache_size_in_bytes) && (cache.size() + 1 <= max_cache_entries);
};
if (!sufficient_space_in_cache())
{
size_t removed_items = 0;
/// Remove stale entries
for (auto it = cache.begin(); it != cache.end();)
if (is_stale(it->first))
{
cache_size_in_bytes -= it->second.sizeInBytes();
it = cache.erase(it);
++removed_items;
}
else
++it;
LOG_TRACE(&Poco::Logger::get("QueryCache"), "Removed {} stale entries", removed_items);
/// same check as in ctor because a parallel Writer could have inserted the current key in the meantime
LOG_TRACE(&Poco::Logger::get("QueryCache"), "Skipped insert (non-stale entry found), query: {}", key.queryStringFromAst());
return;
}
if (!sufficient_space_in_cache())
LOG_TRACE(&Poco::Logger::get("QueryResultCache"), "Skipped insert (cache has insufficient space), query: {}", key.queryStringFromAst());
else
{
//// Insert or replace key
cache_size_in_bytes += query_result.sizeInBytes();
if (auto it = cache.find(key); it != cache.end())
cache_size_in_bytes -= it->second.sizeInBytes(); // key replacement
cache[key] = std::move(query_result);
LOG_TRACE(&Poco::Logger::get("QueryCache"), "Stored result of query {}", key.queryStringFromAst());
}
cache.set(key, query_result);
}
QueryCache::Reader::Reader(const Cache & cache_, const Key & key, size_t & cache_size_in_bytes_, const std::lock_guard<std::mutex> &)
QueryCache::Reader::Reader(Cache & cache_, const Key & key, const std::lock_guard<std::mutex> &)
{
auto it = cache_.find(key);
auto entry = cache_.getWithKey(key);
if (it == cache_.end())
if (!entry.has_value())
{
LOG_TRACE(&Poco::Logger::get("QueryCache"), "No entry found for query {}", key.queryStringFromAst());
return;
}
if (it->first.username.has_value() && it->first.username != key.username)
if (entry->key.username.has_value() && entry->key.username != key.username)
{
LOG_TRACE(&Poco::Logger::get("QueryCache"), "Inaccessible entry found for query {}", key.queryStringFromAst());
return;
}
if (is_stale(it->first))
if (IsStale()(entry->key))
{
cache_size_in_bytes_ -= it->second.sizeInBytes();
const_cast<Cache &>(cache_).erase(it);
LOG_TRACE(&Poco::Logger::get("QueryCache"), "Stale entry found and removed for query {}", key.queryStringFromAst());
LOG_TRACE(&Poco::Logger::get("QueryCache"), "Stale entry found for query {}", key.queryStringFromAst());
return;
}
pipe = Pipe(std::make_shared<SourceFromChunks>(it->first.header, it->second.chunks));
pipe = Pipe(std::make_shared<SourceFromChunks>(entry->key.header, entry->mapped));
LOG_TRACE(&Poco::Logger::get("QueryCache"), "Entry found for query {}", key.queryStringFromAst());
}
@ -316,19 +274,19 @@ Pipe && QueryCache::Reader::getPipe()
QueryCache::Reader QueryCache::createReader(const Key & key)
{
std::lock_guard lock(mutex);
return Reader(cache, key, cache_size_in_bytes, lock);
return Reader(cache, key, lock);
}
QueryCache::Writer QueryCache::createWriter(const Key & key, std::chrono::milliseconds min_query_runtime)
{
std::lock_guard lock(mutex);
return Writer(mutex, cache, key, cache_size_in_bytes, max_cache_size_in_bytes, max_cache_entries, max_cache_entry_size_in_bytes, max_cache_entry_size_in_rows, min_query_runtime);
return Writer(cache, key, max_entry_size_in_bytes, max_entry_size_in_rows, min_query_runtime);
}
void QueryCache::reset()
{
cache.reset();
std::lock_guard lock(mutex);
cache.clear();
times_executed.clear();
cache_size_in_bytes = 0;
}
@ -344,13 +302,28 @@ size_t QueryCache::recordQueryRun(const Key & key)
return times;
}
std::vector<QueryCache::Cache::KeyMapped> QueryCache::dump() const
{
return cache.dump();
}
QueryCache::QueryCache()
: cache(std::make_unique<TTLCachePolicy<Key, QueryResult, KeyHasher, QueryResultWeight, IsStale>>())
{
}
void QueryCache::updateConfiguration(const Poco::Util::AbstractConfiguration & config)
{
std::lock_guard lock(mutex);
max_cache_size_in_bytes = config.getUInt64("query_cache.size", 1_GiB);
max_cache_entries = config.getUInt64("query_cache.max_entries", 1024);
max_cache_entry_size_in_bytes = config.getUInt64("query_cache.max_entry_size", 1_MiB);
max_cache_entry_size_in_rows = config.getUInt64("query_cache.max_entry_rows", 30'000'000);
size_t max_size_in_bytes = config.getUInt64("query_cache.size", 1_GiB);
cache.setMaxSize(max_size_in_bytes);
size_t max_entries = config.getUInt64("query_cache.max_entries", 1024);
cache.setMaxCount(max_entries);
max_entry_size_in_bytes = config.getUInt64("query_cache.max_entry_size", 1_MiB);
max_entry_size_in_rows = config.getUInt64("query_cache.max_entry_rows", 30'000'000);
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/CacheBase.h>
#include <Core/Block.h>
#include <Parsers/IAST_fwd.h>
#include <Poco/Util/LayeredConfiguration.h>
@ -41,7 +42,7 @@ public:
/// Result metadata for constructing the pipe.
const Block header;
/// Std::nullopt means that the associated entry can be read by other users. In general, sharing is a bad idea: First, it is
/// std::nullopt means that the associated entry can be read by other users. In general, sharing is a bad idea: First, it is
/// unlikely that different users pose the same queries. Second, sharing potentially breaches security. E.g. User A should not be
/// able to bypass row policies on some table by running the same queries as user B for whom no row policies exist.
const std::optional<String> username;
@ -57,15 +58,7 @@ public:
String queryStringFromAst() const;
};
struct QueryResult
{
std::shared_ptr<Chunks> chunks = std::make_shared<Chunks>();
size_t sizeInBytes() const;
/// Notes: 1. For performance reasons, we cache the original result chunks as-is (no concatenation during cache insert or lookup).
/// 2. Ref-counting (shared_ptr) ensures that eviction of an entry does not affect queries which still read from the cache.
/// (this can also be achieved by copying the chunks during lookup but that would be under the cache lock --> too slow)
};
using QueryResult = Chunks;
private:
struct KeyHasher
@ -73,8 +66,18 @@ private:
size_t operator()(const Key & key) const;
};
struct QueryResultWeight
{
size_t operator()(const QueryResult & chunks) const;
};
struct IsStale
{
bool operator()(const Key & key) const;
};
/// query --> query result
using Cache = std::unordered_map<Key, QueryResult, KeyHasher>;
using Cache = CacheBase<Key, QueryResult, KeyHasher, QueryResultWeight>;
/// query --> query execution count
using TimesExecuted = std::unordered_map<Key, size_t, KeyHasher>;
@ -97,24 +100,19 @@ public:
void buffer(Chunk && partial_query_result);
void finalizeWrite();
private:
std::mutex & mutex;
Cache & cache TSA_GUARDED_BY(mutex);
std::mutex mutex;
Cache & cache;
const Key key;
size_t & cache_size_in_bytes TSA_GUARDED_BY(mutex);
const size_t max_cache_size_in_bytes;
const size_t max_cache_entries;
size_t new_entry_size_in_bytes = 0;
size_t new_entry_size_in_bytes TSA_GUARDED_BY(mutex) = 0;
const size_t max_entry_size_in_bytes;
size_t new_entry_size_in_rows = 0;
size_t new_entry_size_in_rows TSA_GUARDED_BY(mutex) = 0;
const size_t max_entry_size_in_rows;
const std::chrono::time_point<std::chrono::system_clock> query_start_time = std::chrono::system_clock::now(); /// Writer construction and finalizeWrite() coincide with query start/end
const std::chrono::milliseconds min_query_runtime;
QueryResult query_result;
std::shared_ptr<QueryResult> query_result TSA_GUARDED_BY(mutex) = std::make_shared<QueryResult>();
std::atomic<bool> skip_insert = false;
Writer(std::mutex & mutex_, Cache & cache_, const Key & key_,
size_t & cache_size_in_bytes_, size_t max_cache_size_in_bytes_,
size_t max_cache_entries_,
Writer(Cache & cache_, const Key & key_,
size_t max_entry_size_in_bytes_, size_t max_entry_size_in_rows_,
std::chrono::milliseconds min_query_runtime_);
@ -128,11 +126,13 @@ public:
bool hasCacheEntryForKey() const;
Pipe && getPipe(); /// must be called only if hasCacheEntryForKey() returns true
private:
Reader(const Cache & cache_, const Key & key, size_t & cache_size_in_bytes_, const std::lock_guard<std::mutex> &);
Reader(Cache & cache_, const Key & key, const std::lock_guard<std::mutex> &);
Pipe pipe;
friend class QueryCache; /// for createReader()
};
QueryCache();
void updateConfiguration(const Poco::Util::AbstractConfiguration & config);
Reader createReader(const Key & key);
@ -143,23 +143,18 @@ public:
/// Record new execution of query represented by key. Returns number of executions so far.
size_t recordQueryRun(const Key & key);
/// For debugging and system tables
std::vector<QueryCache::Cache::KeyMapped> dump() const;
private:
/// Implementation note: The query result implements a custom caching mechanism and doesn't make use of CacheBase, unlike many other
/// internal caches in ClickHouse. The main reason is that we don't need standard CacheBase (S)LRU eviction as the expiry times
/// associated with cache entries provide a "natural" eviction criterion. As a future TODO, we could make an expiry-based eviction
/// policy and use that with CacheBase (e.g. see #23706)
/// TODO To speed up removal of stale entries, we could also add another container sorted on expiry times which maps keys to iterators
/// into the cache. To insert an entry, add it to the cache + add the iterator to the sorted container. To remove stale entries, do a
/// binary search on the sorted container and erase all left of the found key.
Cache cache;
mutable std::mutex mutex;
Cache cache TSA_GUARDED_BY(mutex);
TimesExecuted times_executed TSA_GUARDED_BY(mutex);
/// Cache configuration
size_t max_cache_size_in_bytes TSA_GUARDED_BY(mutex) = 0;
size_t max_cache_entries TSA_GUARDED_BY(mutex) = 0;
size_t max_cache_entry_size_in_bytes TSA_GUARDED_BY(mutex) = 0;
size_t max_cache_entry_size_in_rows TSA_GUARDED_BY(mutex) = 0;
size_t max_entry_size_in_bytes TSA_GUARDED_BY(mutex) = 0;
size_t max_entry_size_in_rows TSA_GUARDED_BY(mutex) = 0;
size_t cache_size_in_bytes TSA_GUARDED_BY(mutex) = 0; /// Updated in each cache insert/delete

View File

@ -1954,14 +1954,14 @@ QueryStatusPtr Context::getProcessListElement() const
}
void Context::setUncompressedCache(size_t max_size_in_bytes, const String & uncompressed_cache_policy)
void Context::setUncompressedCache(const String & uncompressed_cache_policy, size_t max_size_in_bytes)
{
auto lock = getLock();
if (shared->uncompressed_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Uncompressed cache has been already created.");
shared->uncompressed_cache = std::make_shared<UncompressedCache>(max_size_in_bytes, uncompressed_cache_policy);
shared->uncompressed_cache = std::make_shared<UncompressedCache>(uncompressed_cache_policy, max_size_in_bytes);
}
@ -1980,14 +1980,14 @@ void Context::dropUncompressedCache() const
}
void Context::setMarkCache(size_t cache_size_in_bytes, const String & mark_cache_policy)
void Context::setMarkCache(const String & mark_cache_policy, size_t cache_size_in_bytes)
{
auto lock = getLock();
if (shared->mark_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mark cache has been already created.");
shared->mark_cache = std::make_shared<MarkCache>(cache_size_in_bytes, mark_cache_policy);
shared->mark_cache = std::make_shared<MarkCache>(mark_cache_policy, cache_size_in_bytes);
}
MarkCachePtr Context::getMarkCache() const

View File

@ -861,12 +861,12 @@ public:
void setSystemZooKeeperLogAfterInitializationIfNeeded();
/// Create a cache of uncompressed blocks of specified size. This can be done only once.
void setUncompressedCache(size_t max_size_in_bytes, const String & uncompressed_cache_policy);
void setUncompressedCache(const String & uncompressed_cache_policy, size_t max_size_in_bytes);
std::shared_ptr<UncompressedCache> getUncompressedCache() const;
void dropUncompressedCache() const;
/// Create a cache of marks of specified size. This can be done only once.
void setMarkCache(size_t cache_size_in_bytes, const String & mark_cache_policy);
void setMarkCache(const String & mark_cache_policy, size_t cache_size_in_bytes);
std::shared_ptr<MarkCache> getMarkCache() const;
void dropMarkCache() const;
ThreadPool & getLoadMarksThreadpool() const;

View File

@ -40,8 +40,11 @@ private:
using Base = CacheBase<UInt128, MarksInCompressedFile, UInt128TrivialHash, MarksWeightFunction>;
public:
explicit MarkCache(size_t max_size_in_bytes, const String & mark_cache_policy = "")
: Base(max_size_in_bytes, 0, mark_cache_policy) {}
explicit MarkCache(size_t max_size_in_bytes)
: Base(max_size_in_bytes) {}
MarkCache(const String & mark_cache_policy, size_t max_size_in_bytes)
: Base(mark_cache_policy, max_size_in_bytes) {}
/// Calculate key from path to file and offset.
static UInt128 hash(const String & path_to_file)

View File

@ -33,11 +33,11 @@ void StorageSystemQueryCache::fillData(MutableColumns & res_columns, ContextPtr
if (!query_cache)
return;
std::vector<QueryCache::Cache::KeyMapped> content = query_cache->dump();
const String & username = context->getUserName();
std::lock_guard lock(query_cache->mutex);
for (const auto & [key, result] : query_cache->cache)
for (const auto & [key, query_result] : content)
{
/// Showing other user's queries is considered a security risk
if (key.username.has_value() && key.username != username)
@ -48,7 +48,7 @@ void StorageSystemQueryCache::fillData(MutableColumns & res_columns, ContextPtr
res_columns[2]->insert(std::chrono::system_clock::to_time_t(key.expires_at));
res_columns[3]->insert(key.expires_at < std::chrono::system_clock::now());
res_columns[4]->insert(!key.username.has_value());
res_columns[5]->insert(result.sizeInBytes());
res_columns[5]->insert(QueryCache::QueryResultWeight()(*query_result));
}
}