Add loading of mark cache policy

This commit is contained in:
alexX512 2022-06-14 07:08:40 +00:00
parent 5bed94a170
commit 2be72560f4
7 changed files with 11 additions and 40 deletions

View File

@ -564,9 +564,10 @@ void LocalServer::processConfig()
global_context->setUncompressedCache(uncompressed_cache_size, uncompressed_cache_policy);
/// Size of cache for marks (index of MergeTree family of tables).
String mark_cache_policy = config().getString("mark_cache_policy", "");
size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120);
if (mark_cache_size)
global_context->setMarkCache(mark_cache_size);
global_context->setMarkCache(mark_cache_size, mark_cache_policy);
/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);

View File

@ -1363,6 +1363,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Size of cache for uncompressed blocks. Zero means disabled.
String uncompressed_cache_policy = config().getString("uncompressed_cache_policy", "");
LOG_INFO(log, "Uncompressed cache policy name {}", uncompressed_cache_policy);
size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", 0);
if (uncompressed_cache_size > max_cache_size)
{
@ -1389,6 +1390,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Size of cache for marks (index of MergeTree family of tables).
size_t mark_cache_size = config().getUInt64("mark_cache_size", 5368709120);
String mark_cache_policy = config().getString("mark_cache_policy", "");
if (!mark_cache_size)
LOG_ERROR(log, "Too low mark cache size will lead to severe performance degradation.");
if (mark_cache_size > max_cache_size)
@ -1397,7 +1399,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_INFO(log, "Mark cache size was lowered to {} because the system has low amount of memory",
formatReadableSizeWithBinarySuffix(mark_cache_size));
}
global_context->setMarkCache(mark_cache_size);
global_context->setMarkCache(mark_cache_size, mark_cache_policy);
/// Size of cache for uncompressed blocks of MergeTree indices. Zero means disabled.
size_t index_uncompressed_cache_size = config().getUInt64("index_uncompressed_cache_size", 0);

View File

@ -31,7 +31,7 @@ public:
using Mapped = TMapped;
using MappedPtr = std::shared_ptr<Mapped>;
CacheBase(size_t max_size, size_t max_elements_size = 0) : CacheBase("SLRU", max_size, max_elements_size) {}
CacheBase(size_t max_size, size_t max_elements_size = 0) : CacheBase(default_cache_policy_name, max_size, max_elements_size) {}
/// TODO: Rewrite "Args... args" to custom struct with fields for all cache policies.
template <class... Args>
@ -44,7 +44,6 @@ public:
cache_policy_name = default_cache_policy_name;
}
LOG_DEBUG(&Poco::Logger::get("CacheBase"), "Cache policy name \"{}\"", cache_policy_name);
if (cache_policy_name == "LRU")
{
using LRUPolicy = LRUCachePolicy<TKey, TMapped, HashFunction, WeightFunction>;
@ -65,12 +64,6 @@ public:
MappedPtr get(const Key & key)
{
std::lock_guard lock(mutex);
auto weight = cache_policy->weight(lock);
auto max_weight = cache_policy->maxSize();
LOG_DEBUG(&Poco::Logger::get("CacheBase"), "Info before get. Weight: {}. Max weight: {}", weight, max_weight);
assert(weight <= max_weight);
auto res = cache_policy->get(key, lock);
if (res)
++hits;
@ -83,12 +76,6 @@ public:
void set(const Key & key, const MappedPtr & mapped)
{
std::lock_guard lock(mutex);
auto weight = cache_policy->weight(lock);
auto max_weight = cache_policy->maxSize();
LOG_DEBUG(&Poco::Logger::get("CacheBase"), "Info before set. Weight: {}. Max weight: {}", weight, max_weight);
assert(weight <= max_weight);
cache_policy->set(key, mapped, lock);
}
@ -106,12 +93,6 @@ public:
InsertTokenHolder token_holder;
{
std::lock_guard cache_lock(mutex);
auto weight = cache_policy->weight(cache_lock);
auto max_weight = cache_policy->maxSize();
LOG_DEBUG(&Poco::Logger::get("CacheBase"), "Info before getOrSet. Weight: {}. Max weight: {}", weight, max_weight);
assert(weight <= max_weight);
auto val = cache_policy->get(key, cache_lock);
if (val)
{
@ -170,12 +151,6 @@ public:
void reset()
{
std::lock_guard lock(mutex);
auto weight = cache_policy->weight(lock);
auto max_weight = cache_policy->maxSize();
LOG_DEBUG(&Poco::Logger::get("CacheBase"), "Info before reset. Weight: {}. Max weight: {}", weight, max_weight);
assert(weight <= max_weight);
insert_tokens.clear();
hits = 0;
misses = 0;
@ -185,12 +160,6 @@ public:
void remove(const Key & key)
{
std::lock_guard lock(mutex);
auto weight = cache_policy->weight(lock);
auto max_weight = cache_policy->maxSize();
LOG_DEBUG(&Poco::Logger::get("CacheBase"), "Info before remove. Weight: {}. Max weight: {}", weight, max_weight);
assert(weight <= max_weight);
cache_policy->remove(key, lock);
}

View File

@ -97,7 +97,6 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, hsts_max_age, 0, "Expired time for hsts. 0 means disable HSTS.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \
M(String, uncompressed_cache_policy, "", "Which cache policy use for the cache of uncompressed blocks.", 0) \
M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited. Only has meaning at server startup.", 0) \

View File

@ -1683,14 +1683,14 @@ void Context::dropUncompressedCache() const
}
void Context::setMarkCache(size_t cache_size_in_bytes)
void Context::setMarkCache(size_t cache_size_in_bytes, String mark_cache_policy)
{
auto lock = getLock();
if (shared->mark_cache)
throw Exception("Mark cache has been already created.", ErrorCodes::LOGICAL_ERROR);
shared->mark_cache = std::make_shared<MarkCache>(cache_size_in_bytes);
shared->mark_cache = std::make_shared<MarkCache>(cache_size_in_bytes, mark_cache_policy);
}
MarkCachePtr Context::getMarkCache() const

View File

@ -785,7 +785,7 @@ public:
void dropUncompressedCache() const;
/// Create a cache of marks of specified size. This can be done only once.
void setMarkCache(size_t cache_size_in_bytes);
void setMarkCache(size_t cache_size_in_bytes, String mark_cache_policy);
std::shared_ptr<MarkCache> getMarkCache() const;
void dropMarkCache() const;

View File

@ -40,8 +40,8 @@ private:
using Base = CacheBase<UInt128, MarksInCompressedFile, UInt128TrivialHash, MarksWeightFunction>;
public:
explicit MarkCache(size_t max_size_in_bytes)
: Base(max_size_in_bytes) {}
explicit MarkCache(size_t max_size_in_bytes, String mark_cache_policy = "")
: Base(mark_cache_policy, max_size_in_bytes) {}
/// Calculate key from path to file and offset.
static UInt128 hash(const String & path_to_file)