Merge pull request #57076 from ClickHouse/slru-for-filesystem-cache

Implement SLRU cache policy for filesystem cache
This commit is contained in:
Kseniia Sumarokova 2023-12-12 10:20:58 +01:00 committed by GitHub
commit 91d36ad224
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1305 additions and 431 deletions

View File

@ -24,6 +24,22 @@ azurite-blob --blobHost 0.0.0.0 --blobPort 10000 --debug /azurite_log &
config_logs_export_cluster /etc/clickhouse-server/config.d/system_logs_export.yaml
cache_policy=""
if [ $(( $(date +%-d) % 2 )) -eq 1 ]; then
cache_policy="SLRU"
else
cache_policy="LRU"
fi
echo "Using cache policy: $cache_policy"
if [ "$cache_policy" = "SLRU" ]; then
sudo cat /etc/clickhouse-server/config.d/storage_conf.xml \
| sed "s|<cache_policy>LRU</cache_policy>|<cache_policy>SLRU</cache_policy>|" \
> /etc/clickhouse-server/config.d/storage_conf.xml.tmp
mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml
fi
function start()
{
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then

View File

@ -65,9 +65,27 @@ chmod 777 -R /var/lib/clickhouse
clickhouse-client --query "ATTACH DATABASE IF NOT EXISTS datasets ENGINE = Ordinary"
clickhouse-client --query "CREATE DATABASE IF NOT EXISTS test"
stop
mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.initial.log
# Randomize cache policies.
cache_policy=""
if [ $(( $(date +%-d) % 2 )) -eq 1 ]; then
cache_policy="SLRU"
else
cache_policy="LRU"
fi
echo "Using cache policy: $cache_policy"
if [ "$cache_policy" = "SLRU" ]; then
sudo cat /etc/clickhouse-server/config.d/storage_conf.xml \
| sed "s|<cache_policy>LRU</cache_policy>|<cache_policy>SLRU</cache_policy>|" \
> /etc/clickhouse-server/config.d/storage_conf.xml.tmp
mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml
fi
start
clickhouse-client --query "SHOW TABLES FROM datasets"
@ -191,6 +209,13 @@ sudo cat /etc/clickhouse-server/config.d/logger_trace.xml \
> /etc/clickhouse-server/config.d/logger_trace.xml.tmp
mv /etc/clickhouse-server/config.d/logger_trace.xml.tmp /etc/clickhouse-server/config.d/logger_trace.xml
if [ "$cache_policy" = "SLRU" ]; then
sudo cat /etc/clickhouse-server/config.d/storage_conf.xml \
| sed "s|<cache_policy>LRU</cache_policy>|<cache_policy>SLRU</cache_policy>|" \
> /etc/clickhouse-server/config.d/storage_conf.xml.tmp
mv /etc/clickhouse-server/config.d/storage_conf.xml.tmp /etc/clickhouse-server/config.d/storage_conf.xml
fi
# Randomize async_load_databases
if [ $(( $(date +%-d) % 2 )) -eq 1 ]; then
sudo echo "<clickhouse><async_load_databases>true</async_load_databases></clickhouse>" \

View File

@ -540,7 +540,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
return false;
current_file_segment = &file_segments->front();
current_file_segment->use();
current_file_segment->increasePriority();
implementation_buffer = getImplementationBuffer(*current_file_segment);
LOG_TEST(
@ -868,7 +868,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
else
{
implementation_buffer = getImplementationBuffer(file_segments->front());
file_segments->front().use();
file_segments->front().increasePriority();
}
chassert(!internal_buffer.empty());

View File

@ -0,0 +1,72 @@
#include <Interpreters/Cache/EvictionCandidates.h>
#include <Interpreters/Cache/Metadata.h>
namespace ProfileEvents
{
extern const Event FilesystemCacheEvictMicroseconds;
extern const Event FilesystemCacheEvictedBytes;
extern const Event FilesystemCacheEvictedFileSegments;
}
namespace DB
{
EvictionCandidates::~EvictionCandidates()
{
for (const auto & [key, key_candidates] : candidates)
{
for (const auto & candidate : key_candidates.candidates)
candidate->removal_candidate = false;
}
}
void EvictionCandidates::add(LockedKey & locked_key, const FileSegmentMetadataPtr & candidate)
{
auto [it, inserted] = candidates.emplace(locked_key.getKey(), KeyCandidates{});
if (inserted)
it->second.key_metadata = locked_key.getKeyMetadata();
it->second.candidates.push_back(candidate);
candidate->removal_candidate = true;
++candidates_size;
}
void EvictionCandidates::evict(FileCacheQueryLimit::QueryContext * query_context, const CacheGuard::Lock & lock)
{
if (candidates.empty())
return;
auto timer = DB::CurrentThread::getProfileEvents().timer(ProfileEvents::FilesystemCacheEvictMicroseconds);
for (auto & [key, key_candidates] : candidates)
{
auto locked_key = key_candidates.key_metadata->tryLock();
if (!locked_key)
continue; /// key could become invalid after we released the key lock above, just skip it.
auto & to_evict = key_candidates.candidates;
while (!to_evict.empty())
{
auto & candidate = to_evict.back();
chassert(candidate->releasable());
const auto segment = candidate->file_segment;
auto queue_it = segment->getQueueIterator();
chassert(queue_it);
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictedFileSegments);
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictedBytes, segment->range().size());
locked_key->removeFileSegment(segment->offset(), segment->lock());
queue_it->remove(lock);
if (query_context)
query_context->remove(segment->key(), segment->offset(), lock);
to_evict.pop_back();
}
}
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Interpreters/Cache/QueryLimit.h>
namespace DB
{
class EvictionCandidates
{
public:
~EvictionCandidates();
void add(LockedKey & locked_key, const FileSegmentMetadataPtr & candidate);
void evict(FileCacheQueryLimit::QueryContext * query_context, const CacheGuard::Lock &);
size_t size() const { return candidates_size; }
auto begin() const { return candidates.begin(); }
auto end() const { return candidates.end(); }
private:
struct KeyCandidates
{
KeyMetadataPtr key_metadata;
std::vector<FileSegmentMetadataPtr> candidates;
};
std::unordered_map<FileCacheKey, KeyCandidates> candidates;
size_t candidates_size = 0;
};
using EvictionCandidatesPtr = std::unique_ptr<EvictionCandidates>;
}

View File

@ -7,6 +7,8 @@
#include <IO/WriteBufferFromString.h>
#include <Interpreters/Cache/FileCacheSettings.h>
#include <Interpreters/Cache/LRUFileCachePriority.h>
#include <Interpreters/Cache/SLRUFileCachePriority.h>
#include <Interpreters/Cache/EvictionCandidates.h>
#include <Interpreters/Context.h>
#include <base/hex.h>
#include <Common/ThreadPool.h>
@ -20,13 +22,8 @@ namespace fs = std::filesystem;
namespace ProfileEvents
{
extern const Event FilesystemCacheLoadMetadataMicroseconds;
extern const Event FilesystemCacheEvictedBytes;
extern const Event FilesystemCacheEvictedFileSegments;
extern const Event FilesystemCacheEvictionSkippedFileSegments;
extern const Event FilesystemCacheEvictionTries;
extern const Event FilesystemCacheLockCacheMicroseconds;
extern const Event FilesystemCacheReserveMicroseconds;
extern const Event FilesystemCacheEvictMicroseconds;
extern const Event FilesystemCacheGetOrSetMicroseconds;
extern const Event FilesystemCacheGetMicroseconds;
}
@ -51,6 +48,28 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
void FileCacheReserveStat::update(size_t size, FileSegmentKind kind, bool releasable)
{
auto & local_stat = stat_by_kind[kind];
if (releasable)
{
stat.releasable_size += size;
++stat.releasable_count;
local_stat.releasable_size += size;
++local_stat.releasable_count;
}
else
{
stat.non_releasable_size += size;
++stat.non_releasable_count;
local_stat.non_releasable_size += size;
++local_stat.non_releasable_count;
}
}
FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & settings)
@ -62,7 +81,14 @@ FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & s
, log(&Poco::Logger::get("FileCache(" + cache_name + ")"))
, metadata(settings.base_path, settings.background_download_queue_size_limit)
{
main_priority = std::make_unique<LRUFileCachePriority>(settings.max_size, settings.max_elements);
if (settings.cache_policy == "LRU")
main_priority = std::make_unique<LRUFileCachePriority>(settings.max_size, settings.max_elements);
else if (settings.cache_policy == "SLRU")
main_priority = std::make_unique<SLRUFileCachePriority>(settings.max_size, settings.max_elements, settings.slru_size_ratio);
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown cache policy: {}", settings.cache_policy);
LOG_DEBUG(log, "Using {} cache policy", settings.cache_policy);
if (settings.cache_hits_threshold)
stash = std::make_unique<HitsCountStash>(settings.cache_hits_threshold, settings.max_elements);
@ -693,7 +719,7 @@ KeyMetadata::iterator FileCache::addFileSegment(
}
else
{
result_state = record_it->second->use(*lock) >= stash->hits_threshold
result_state = record_it->second->increasePriority(*lock) >= stash->hits_threshold
? FileSegment::State::EMPTY
: FileSegment::State::DETACHED;
}
@ -754,177 +780,34 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size, FileCa
file_segment.key(), file_segment.offset());
}
struct EvictionCandidates
{
explicit EvictionCandidates(KeyMetadataPtr key_metadata_) : key_metadata(std::move(key_metadata_)) {}
void add(const FileSegmentMetadataPtr & candidate)
{
candidate->removal_candidate = true;
candidates.push_back(candidate);
}
~EvictionCandidates()
{
/// If failed to reserve space, we don't delete the candidates but drop the flag instead
/// so the segments can be used again
for (const auto & candidate : candidates)
candidate->removal_candidate = false;
}
KeyMetadataPtr key_metadata;
std::vector<FileSegmentMetadataPtr> candidates;
};
std::unordered_map<Key, EvictionCandidates> to_delete;
size_t freeable_space = 0, freeable_count = 0;
auto iterate_func = [&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{
chassert(segment_metadata->file_segment->assertCorrectness());
auto & stat_by_kind = reserve_stat.stat_by_kind[segment_metadata->file_segment->getKind()];
if (segment_metadata->releasable())
{
const auto & key = segment_metadata->file_segment->key();
auto it = to_delete.find(key);
if (it == to_delete.end())
it = to_delete.emplace(key, locked_key.getKeyMetadata()).first;
it->second.add(segment_metadata);
stat_by_kind.releasable_size += segment_metadata->size();
++stat_by_kind.releasable_count;
freeable_space += segment_metadata->size();
++freeable_count;
}
else
{
stat_by_kind.non_releasable_size += segment_metadata->size();
++stat_by_kind.non_releasable_count;
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionSkippedFileSegments);
}
return PriorityIterationResult::CONTINUE;
};
EvictionCandidates eviction_candidates;
IFileCachePriority::FinalizeEvictionFunc finalize_eviction_func;
if (query_priority)
{
auto is_query_priority_overflow = [&]
{
const size_t new_size = query_priority->getSize(cache_lock) + size - freeable_space;
return new_size > query_priority->getSizeLimit();
};
if (is_query_priority_overflow())
{
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionTries);
query_priority->iterate(
[&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{ return is_query_priority_overflow() ? iterate_func(locked_key, segment_metadata) : PriorityIterationResult::BREAK; },
cache_lock);
if (is_query_priority_overflow())
return false;
}
LOG_TEST(
log, "Query limits satisfied (while reserving for {}:{})",
file_segment.key(), file_segment.offset());
}
auto is_main_priority_overflow = [main_priority_size_limit = main_priority->getSizeLimit(),
main_priority_elements_limit = main_priority->getElementsLimit(),
size,
&freeable_space,
&freeable_count,
&file_segment,
&cache_lock,
my_main_priority = this->main_priority.get(),
my_log = this->log]
{
const bool is_overflow =
/// size_limit == 0 means unlimited cache size
(main_priority_size_limit != 0 && (my_main_priority->getSize(cache_lock) + size - freeable_space > main_priority_size_limit))
/// elements_limit == 0 means unlimited number of cache elements
|| (main_priority_elements_limit != 0 && freeable_count == 0
&& my_main_priority->getElementsCount(cache_lock) == main_priority_elements_limit);
LOG_TEST(
my_log, "Overflow: {}, size: {}, ready to remove: {} ({} in number), current cache size: {}/{}, elements: {}/{}, while reserving for {}:{}",
is_overflow, size, freeable_space, freeable_count,
my_main_priority->getSize(cache_lock), my_main_priority->getSizeLimit(),
my_main_priority->getElementsCount(cache_lock), my_main_priority->getElementsLimit(),
file_segment.key(), file_segment.offset());
return is_overflow;
};
/// If we have enough space in query_priority, we are not interested about stat there anymore.
/// Clean the stat before iterating main_priority to avoid calculating any segment stat twice.
reserve_stat.stat_by_kind.clear();
if (is_main_priority_overflow())
{
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionTries);
main_priority->iterate(
[&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{ return is_main_priority_overflow() ? iterate_func(locked_key, segment_metadata) : PriorityIterationResult::BREAK; },
cache_lock);
if (is_main_priority_overflow())
if (!query_priority->collectCandidatesForEviction(size, reserve_stat, eviction_candidates, {}, finalize_eviction_func, cache_lock))
return false;
LOG_TEST(log, "Query limits satisfied (while reserving for {}:{})", file_segment.key(), file_segment.offset());
/// If we have enough space in query_priority, we are not interested about stat there anymore.
/// Clean the stat before iterating main_priority to avoid calculating any segment stat twice.
reserve_stat.stat_by_kind.clear();
}
/// A file_segment_metadata acquires a priority iterator on first successful space reservation attempt,
auto queue_iterator = file_segment.getQueueIterator();
chassert(!queue_iterator || file_segment.getReservedSize() > 0);
if (!main_priority->collectCandidatesForEviction(size, reserve_stat, eviction_candidates, queue_iterator, finalize_eviction_func, cache_lock))
return false;
if (!file_segment.getKeyMetadata()->createBaseDirectory())
return false;
if (!to_delete.empty())
{
LOG_DEBUG(
log, "Will evict {} file segments (while reserving {} bytes for {}:{})",
to_delete.size(), size, file_segment.key(), file_segment.offset());
eviction_candidates.evict(query_context.get(), cache_lock);
ProfileEventTimeIncrement<Microseconds> evict_watch(ProfileEvents::FilesystemCacheEvictMicroseconds);
for (auto & [current_key, deletion_info] : to_delete)
{
auto locked_key = deletion_info.key_metadata->tryLock();
if (!locked_key)
continue; /// key could become invalid after we released the key lock above, just skip it.
/// delete from vector in reverse order just for efficiency
auto & candidates = deletion_info.candidates;
while (!candidates.empty())
{
auto & candidate = candidates.back();
chassert(candidate->releasable());
const auto * segment = candidate->file_segment.get();
auto queue_it = segment->getQueueIterator();
chassert(queue_it);
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictedFileSegments);
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictedBytes, segment->range().size());
locked_key->removeFileSegment(segment->offset(), segment->lock());
queue_it->remove(cache_lock);
if (query_context)
query_context->remove(current_key, segment->offset(), cache_lock);
candidates.pop_back();
}
}
}
/// A file_segment_metadata acquires a LRUQueue iterator on first successful space reservation attempt,
/// e.g. queue_iteratir is std::nullopt here if no space has been reserved yet.
auto queue_iterator = file_segment.getQueueIterator();
chassert(!queue_iterator || file_segment.getReservedSize() > 0);
if (finalize_eviction_func)
finalize_eviction_func(cache_lock);
if (queue_iterator)
{
@ -994,8 +877,7 @@ void FileCache::removeAllReleasable()
{
/// Remove all access information.
auto lock = lockCache();
stash->records.clear();
stash->queue->removeAll(lock);
stash->clear();
}
}
@ -1140,9 +1022,6 @@ void FileCache::loadMetadataForKeys(const fs::path & keys_dir)
const auto key = Key::fromKeyString(key_directory.filename().string());
auto key_metadata = metadata.getKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::CREATE_EMPTY, /* is_initial_load */true);
const size_t size_limit = main_priority->getSizeLimit();
const size_t elements_limit = main_priority->getElementsLimit();
for (fs::directory_iterator offset_it{key_directory}; offset_it != fs::directory_iterator(); ++offset_it)
{
auto offset_with_suffix = offset_it->path().filename().string();
@ -1182,14 +1061,14 @@ void FileCache::loadMetadataForKeys(const fs::path & keys_dir)
}
bool limits_satisfied;
IFileCachePriority::Iterator cache_it;
IFileCachePriority::IteratorPtr cache_it;
{
auto lock = lockCache();
limits_satisfied = (size_limit == 0 || main_priority->getSize(lock) + size <= size_limit)
&& (elements_limit == 0 || main_priority->getElementsCount(lock) + 1 <= elements_limit);
limits_satisfied = main_priority->canFit(size, lock);
if (limits_satisfied)
cache_it = main_priority->add(key_metadata, offset, size, lock);
cache_it = main_priority->add(key_metadata, offset, size, lock, /* is_startup */true);
/// TODO: we can get rid of this lockCache() if we first load everything in parallel
/// without any mutual lock between loading threads, and only after do removeOverflow().
@ -1297,15 +1176,7 @@ std::vector<FileSegment::Info> FileCache::getFileSegmentInfos(const Key & key)
std::vector<FileSegment::Info> FileCache::dumpQueue()
{
assertInitialized();
std::vector<FileSegment::Info> file_segments;
main_priority->iterate([&](LockedKey &, const FileSegmentMetadataPtr & segment_metadata)
{
file_segments.push_back(FileSegment::getInfo(segment_metadata->file_segment, *this));
return PriorityIterationResult::CONTINUE;
}, lockCache());
return file_segments;
return main_priority->dump(*this, lockCache());
}
std::vector<String> FileCache::tryGetCachePaths(const Key & key)
@ -1392,4 +1263,17 @@ std::vector<FileSegment::Info> FileCache::sync()
return file_segments;
}
FileCache::HitsCountStash::HitsCountStash(size_t hits_threashold_, size_t queue_size_)
: hits_threshold(hits_threashold_), queue_size(queue_size_), queue(std::make_unique<LRUFileCachePriority>(0, queue_size_))
{
if (!queue_size_)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Queue size for hits queue must be non-zero");
}
void FileCache::HitsCountStash::clear()
{
records.clear();
queue = std::make_unique<LRUFileCachePriority>(0, queue_size);
}
}

View File

@ -22,25 +22,23 @@
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
/// Track acquired space in cache during reservation
/// to make error messages when no space left more informative.
struct FileCacheReserveStat
{
struct Stat
{
size_t releasable_size;
size_t releasable_count;
size_t releasable_size = 0;
size_t releasable_count = 0;
size_t non_releasable_size;
size_t non_releasable_count;
size_t non_releasable_size = 0;
size_t non_releasable_count = 0;
};
Stat stat;
std::unordered_map<FileSegmentKind, Stat> stat_by_kind;
void update(size_t size, FileSegmentKind kind, bool releasable);
};
/// Local cache for remote filesystem files, represented as a set of non-overlapping non-empty file segments.
@ -52,8 +50,6 @@ public:
using QueryLimit = DB::FileCacheQueryLimit;
using Priority = IFileCachePriority;
using PriorityEntry = IFileCachePriority::Entry;
using PriorityIterator = IFileCachePriority::Iterator;
using PriorityIterationResult = IFileCachePriority::IterationResult;
FileCache(const std::string & cache_name, const FileCacheSettings & settings);
@ -177,16 +173,14 @@ private:
struct HitsCountStash
{
HitsCountStash(size_t hits_threashold_, size_t queue_size_)
: hits_threshold(hits_threashold_), queue(std::make_unique<LRUFileCachePriority>(0, queue_size_))
{
if (!queue_size_)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Queue size for hits queue must be non-zero");
}
HitsCountStash(size_t hits_threashold_, size_t queue_size_);
void clear();
const size_t hits_threshold;
FileCachePriorityPtr queue;
using Records = std::unordered_map<KeyAndOffset, PriorityIterator, FileCacheKeyAndOffsetHash>;
const size_t queue_size;
std::unique_ptr<LRUFileCachePriority> queue;
using Records = std::unordered_map<KeyAndOffset, Priority::IteratorPtr, FileCacheKeyAndOffsetHash>;
Records records;
};

View File

@ -3,6 +3,7 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Common/Exception.h>
#include <Common/NamedCollections/NamedCollections.h>
#include <boost/algorithm/string/case_conv.hpp>
#include <IO/ReadHelpers.h>
namespace DB
@ -13,7 +14,7 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
void FileCacheSettings::loadImpl(FuncHas has, FuncGetUInt get_uint, FuncGetString get_string)
void FileCacheSettings::loadImpl(FuncHas has, FuncGetUInt get_uint, FuncGetString get_string, FuncGetDouble get_double)
{
auto config_parse_size = [&](std::string_view key) { return parseWithSizeSuffix<uint64_t>(get_string(key)); };
@ -64,6 +65,15 @@ void FileCacheSettings::loadImpl(FuncHas has, FuncGetUInt get_uint, FuncGetStrin
if (boundary_alignment > max_file_segment_size)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Setting `boundary_alignment` cannot exceed `max_file_segment_size`");
if (has("cache_policy"))
{
cache_policy = get_string("cache_policy");
boost::to_upper(cache_policy);
}
if (has("slru_size_ratio"))
slru_size_ratio = get_double("slru_size_ratio");
}
void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
@ -71,15 +81,17 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
auto config_has = [&](std::string_view key) { return config.has(fmt::format("{}.{}", config_prefix, key)); };
auto config_get_uint = [&](std::string_view key) { return config.getUInt(fmt::format("{}.{}", config_prefix, key)); };
auto config_get_string = [&](std::string_view key) { return config.getString(fmt::format("{}.{}", config_prefix, key)); };
loadImpl(std::move(config_has), std::move(config_get_uint), std::move(config_get_string));
auto config_get_double = [&](std::string_view key) { return config.getDouble(fmt::format("{}.{}", config_prefix, key)); };
loadImpl(std::move(config_has), std::move(config_get_uint), std::move(config_get_string), std::move(config_get_double));
}
void FileCacheSettings::loadFromCollection(const NamedCollection & collection)
{
auto config_has = [&](std::string_view key) { return collection.has(std::string(key)); };
auto config_get_uint = [&](std::string_view key) { return collection.get<UInt64>(std::string(key)); };
auto config_get_string = [&](std::string_view key) { return collection.get<String>(std::string(key)); };
loadImpl(std::move(config_has), std::move(config_get_uint), std::move(config_get_string));
auto collection_has = [&](std::string_view key) { return collection.has(std::string(key)); };
auto collection_get_uint = [&](std::string_view key) { return collection.get<UInt64>(std::string(key)); };
auto collection_get_string = [&](std::string_view key) { return collection.get<String>(std::string(key)); };
auto collection_get_double = [&](std::string_view key) { return collection.get<Float64>(std::string(key)); };
loadImpl(std::move(collection_has), std::move(collection_get_uint), std::move(collection_get_string), std::move(collection_get_double));
}
}

View File

@ -32,6 +32,9 @@ struct FileCacheSettings
size_t load_metadata_threads = FILECACHE_DEFAULT_LOAD_METADATA_THREADS;
std::string cache_policy = "LRU";
double slru_size_ratio = 0.5;
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
void loadFromCollection(const NamedCollection & collection);
@ -39,7 +42,8 @@ private:
using FuncHas = std::function<bool(std::string_view)>;
using FuncGetUInt = std::function<size_t(std::string_view)>;
using FuncGetString = std::function<std::string(std::string_view)>;
void loadImpl(FuncHas has, FuncGetUInt get_uint, FuncGetString get_string);
using FuncGetDouble = std::function<double(std::string_view)>;
void loadImpl(FuncHas has, FuncGetUInt get_uint, FuncGetString get_string, FuncGetDouble get_double);
};
}

View File

@ -54,7 +54,7 @@ FileSegment::FileSegment(
bool background_download_enabled_,
FileCache * cache_,
std::weak_ptr<KeyMetadata> key_metadata_,
Priority::Iterator queue_iterator_)
Priority::IteratorPtr queue_iterator_)
: file_key(key_)
, segment_range(offset_, offset_ + size_ - 1)
, segment_kind(settings.kind)
@ -146,13 +146,13 @@ size_t FileSegment::getReservedSize() const
return reserved_size;
}
FileSegment::Priority::Iterator FileSegment::getQueueIterator() const
FileSegment::Priority::IteratorPtr FileSegment::getQueueIterator() const
{
auto lock = lockFileSegment();
return queue_iterator;
}
void FileSegment::setQueueIterator(Priority::Iterator iterator)
void FileSegment::setQueueIterator(Priority::IteratorPtr iterator)
{
auto lock = lockFileSegment();
if (queue_iterator)
@ -480,7 +480,7 @@ bool FileSegment::reserve(size_t size_to_reserve, FileCacheReserveStat * reserve
bool is_file_segment_size_exceeded;
{
auto lock = segment_guard.lock();
auto lock = lockFileSegment();
assertNotDetachedUnlocked(lock);
assertIsDownloaderUnlocked("reserve", lock);
@ -773,7 +773,7 @@ bool FileSegment::assertCorrectness() const
bool FileSegment::assertCorrectnessUnlocked(const FileSegmentGuard::Lock &) const
{
auto check_iterator = [this](const Priority::Iterator & it)
auto check_iterator = [this](const Priority::IteratorPtr & it)
{
UNUSED(this);
if (!it)
@ -849,6 +849,7 @@ FileSegment::Info FileSegment::getInfo(const FileSegmentPtr & file_segment, File
.cache_hits = file_segment->hits_count,
.references = static_cast<uint64_t>(file_segment.use_count()),
.is_unbound = file_segment->is_unbound,
.queue_entry_type = file_segment->queue_iterator ? file_segment->queue_iterator->getType() : QueueEntryType::None,
};
}
@ -904,7 +905,7 @@ void FileSegment::detach(const FileSegmentGuard::Lock & lock, const LockedKey &)
setDetachedState(lock);
}
void FileSegment::use()
void FileSegment::increasePriority()
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FileSegmentUseMicroseconds);
@ -918,7 +919,7 @@ void FileSegment::use()
if (it)
{
auto cache_lock = cache->lockCache();
hits_count = it->use(cache_lock);
hits_count = it->increasePriority(cache_lock);
}
}

View File

@ -11,8 +11,8 @@
#include <IO/OpenedFileCache.h>
#include <base/getThreadId.h>
#include <Interpreters/Cache/IFileCachePriority.h>
#include <Interpreters/Cache/FileSegmentInfo.h>
#include <Interpreters/Cache/FileCache_fwd_internal.h>
#include <queue>
namespace Poco { class Logger; }
@ -28,23 +28,6 @@ namespace DB
class ReadBufferFromFileBase;
struct FileCacheReserveStat;
/*
* FileSegmentKind is used to specify the eviction policy for file segments.
*/
enum class FileSegmentKind
{
/* `Regular` file segment is still in cache after usage, and can be evicted
* (unless there're some holders).
*/
Regular,
/* `Temporary` file segment is removed right after releasing.
* Also corresponding files are removed during cache loading (if any).
*/
Temporary,
};
String toString(FileSegmentKind kind);
struct CreateFileSegmentSettings
{
@ -69,40 +52,9 @@ public:
using Downloader = std::string;
using DownloaderId = std::string;
using Priority = IFileCachePriority;
enum class State
{
DOWNLOADED,
/**
* When file segment is first created and returned to user, it has state EMPTY.
* EMPTY state can become DOWNLOADING when getOrSetDownaloder is called successfully
* by any owner of EMPTY state file segment.
*/
EMPTY,
/**
* A newly created file segment never has DOWNLOADING state until call to getOrSetDownloader
* because each cache user might acquire multiple file segments and read them one by one,
* so only user which actually needs to read this segment earlier than others - becomes a downloader.
*/
DOWNLOADING,
/**
* Space reservation for a file segment is incremental, i.e. downloader reads buffer_size bytes
* from remote fs -> tries to reserve buffer_size bytes to put them to cache -> writes to cache
* on successful reservation and stops cache write otherwise. Those, who waited for the same file
* segment, will read downloaded part from cache and remaining part directly from remote fs.
*/
PARTIALLY_DOWNLOADED_NO_CONTINUATION,
/**
* If downloader did not finish download of current file segment for any reason apart from running
* out of cache space, then download can be continued by other owners of this file segment.
*/
PARTIALLY_DOWNLOADED,
/**
* If file segment cannot possibly be downloaded (first space reservation attempt failed), mark
* this file segment as out of cache scope.
*/
DETACHED,
};
using State = FileSegmentState;
using Info = FileSegmentInfo;
using QueueEntryType = FileCacheQueueEntryType;
FileSegment(
const Key & key_,
@ -113,7 +65,7 @@ public:
bool background_download_enabled_ = false,
FileCache * cache_ = nullptr,
std::weak_ptr<KeyMetadata> key_metadata_ = std::weak_ptr<KeyMetadata>(),
Priority::Iterator queue_iterator_ = Priority::Iterator{});
Priority::IteratorPtr queue_iterator_ = nullptr);
~FileSegment() = default;
@ -205,22 +157,7 @@ public:
/// exception.
void detach(const FileSegmentGuard::Lock &, const LockedKey &);
struct Info
{
FileSegment::Key key;
size_t offset;
std::string path;
uint64_t range_left;
uint64_t range_right;
FileSegmentKind kind;
State state;
uint64_t size;
uint64_t downloaded_size;
uint64_t cache_hits;
uint64_t references;
bool is_unbound;
};
static Info getInfo(const FileSegmentPtr & file_segment, FileCache & cache);
static FileSegmentInfo getInfo(const FileSegmentPtr & file_segment, FileCache & cache);
bool isDetached() const;
@ -228,7 +165,7 @@ public:
/// is not going to be changed. Completed states: DOWNALODED, DETACHED.
bool isCompleted(bool sync = false) const;
void use();
void increasePriority();
/**
* ========== Methods used by `cache` ========================
@ -236,9 +173,9 @@ public:
FileSegmentGuard::Lock lock() const { return segment_guard.lock(); }
Priority::Iterator getQueueIterator() const;
Priority::IteratorPtr getQueueIterator() const;
void setQueueIterator(Priority::Iterator iterator);
void setQueueIterator(Priority::IteratorPtr iterator);
KeyMetadataPtr tryGetKeyMetadata() const;
@ -326,7 +263,7 @@ private:
mutable FileSegmentGuard segment_guard;
std::weak_ptr<KeyMetadata> key_metadata;
mutable Priority::Iterator queue_iterator; /// Iterator is put here on first reservation attempt, if successful.
mutable Priority::IteratorPtr queue_iterator; /// Iterator is put here on first reservation attempt, if successful.
FileCache * cache;
std::condition_variable cv;

View File

@ -0,0 +1,82 @@
#pragma once
#include <Interpreters/Cache/FileCache_fwd.h>
#include <Interpreters/Cache/FileCacheKey.h>
namespace DB
{
enum class FileSegmentState
{
DOWNLOADED,
/**
* When file segment is first created and returned to user, it has state EMPTY.
* EMPTY state can become DOWNLOADING when getOrSetDownaloder is called successfully
* by any owner of EMPTY state file segment.
*/
EMPTY,
/**
* A newly created file segment never has DOWNLOADING state until call to getOrSetDownloader
* because each cache user might acquire multiple file segments and read them one by one,
* so only user which actually needs to read this segment earlier than others - becomes a downloader.
*/
DOWNLOADING,
/**
* Space reservation for a file segment is incremental, i.e. downloader reads buffer_size bytes
* from remote fs -> tries to reserve buffer_size bytes to put them to cache -> writes to cache
* on successful reservation and stops cache write otherwise. Those, who waited for the same file
* segment, will read downloaded part from cache and remaining part directly from remote fs.
*/
PARTIALLY_DOWNLOADED_NO_CONTINUATION,
/**
* If downloader did not finish download of current file segment for any reason apart from running
* out of cache space, then download can be continued by other owners of this file segment.
*/
PARTIALLY_DOWNLOADED,
/**
* If file segment cannot possibly be downloaded (first space reservation attempt failed), mark
* this file segment as out of cache scope.
*/
DETACHED,
};
enum class FileSegmentKind
{
/**
* `Regular` file segment is still in cache after usage, and can be evicted
* (unless there're some holders).
*/
Regular,
/**
* Temporary` file segment is removed right after releasing.
* Also corresponding files are removed during cache loading (if any).
*/
Temporary,
};
enum class FileCacheQueueEntryType
{
None,
LRU,
SLRU_Protected,
SLRU_Probationary,
};
std::string toString(FileSegmentKind kind);
struct FileSegmentInfo
{
FileCacheKey key;
size_t offset;
std::string path;
uint64_t range_left;
uint64_t range_right;
FileSegmentKind kind;
FileSegmentState state;
uint64_t size;
uint64_t downloaded_size;
uint64_t cache_hits;
uint64_t references;
bool is_unbound;
FileCacheQueueEntryType queue_entry_type;
};
}

View File

@ -0,0 +1,40 @@
#include <Interpreters/Cache/IFileCachePriority.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric FilesystemCacheSizeLimit;
}
namespace DB
{
IFileCachePriority::IFileCachePriority(size_t max_size_, size_t max_elements_)
: max_size(max_size_), max_elements(max_elements_)
{
CurrentMetrics::set(CurrentMetrics::FilesystemCacheSizeLimit, max_size_);
}
IFileCachePriority::Entry::Entry(
const Key & key_,
size_t offset_,
size_t size_,
KeyMetadataPtr key_metadata_)
: key(key_)
, offset(offset_)
, key_metadata(key_metadata_)
, size(size_)
{
}
IFileCachePriority::Entry::Entry(const Entry & other)
: key(other.key)
, offset(other.offset)
, key_metadata(other.key_metadata)
, size(other.size.load())
, hits(other.hits)
{
}
}

View File

@ -1,72 +1,57 @@
#pragma once
#include <memory>
#include <mutex>
#include <Core/Types.h>
#include <Common/Exception.h>
#include <Interpreters/Cache/FileCacheKey.h>
#include <Interpreters/Cache/FileSegmentInfo.h>
#include <Interpreters/Cache/Guards.h>
#include <Interpreters/Cache/IFileCachePriority.h>
#include <Interpreters/Cache/FileCache_fwd_internal.h>
namespace DB
{
struct FileCacheReserveStat;
class EvictionCandidates;
/// IFileCachePriority is used to maintain the priority of cached data.
class IFileCachePriority : private boost::noncopyable
{
public:
using Key = FileCacheKey;
using KeyAndOffset = FileCacheKeyAndOffset;
using QueueEntryType = FileCacheQueueEntryType;
struct Entry
{
Entry(const Key & key_, size_t offset_, size_t size_, KeyMetadataPtr key_metadata_)
: key(key_), offset(offset_), size(size_), key_metadata(key_metadata_) {}
Entry(const Entry & other)
: key(other.key), offset(other.offset), size(other.size.load()), hits(other.hits), key_metadata(other.key_metadata) {}
Entry(const Key & key_, size_t offset_, size_t size_, KeyMetadataPtr key_metadata_);
Entry(const Entry & other);
const Key key;
const size_t offset;
const KeyMetadataPtr key_metadata;
std::atomic<size_t> size;
size_t hits = 0;
const KeyMetadataPtr key_metadata;
};
/// Provides an iterator to traverse the cache priority. Under normal circumstances,
/// the iterator can only return the records that have been directly swapped out.
/// For example, in the LRU algorithm, it can traverse all records, but in the LRU-K, it
/// can only traverse the records in the low priority queue.
class IIterator
class Iterator
{
public:
virtual ~IIterator() = default;
virtual size_t use(const CacheGuard::Lock &) = 0;
virtual void remove(const CacheGuard::Lock &) = 0;
virtual ~Iterator() = default;
virtual const Entry & getEntry() const = 0;
virtual Entry & getEntry() = 0;
virtual size_t increasePriority(const CacheGuard::Lock &) = 0;
virtual void updateSize(int64_t size) = 0;
virtual void remove(const CacheGuard::Lock &) = 0;
virtual void invalidate() = 0;
virtual void updateSize(int64_t size) = 0;
virtual QueueEntryType getType() const = 0;
};
using IteratorPtr = std::shared_ptr<Iterator>;
using Iterator = std::shared_ptr<IIterator>;
using ConstIterator = std::shared_ptr<const IIterator>;
enum class IterationResult
{
BREAK,
CONTINUE,
REMOVE_AND_CONTINUE,
};
using IterateFunc = std::function<IterationResult(LockedKey &, const FileSegmentMetadataPtr &)>;
IFileCachePriority(size_t max_size_, size_t max_elements_) : max_size(max_size_), max_elements(max_elements_) {}
IFileCachePriority(size_t max_size_, size_t max_elements_);
virtual ~IFileCachePriority() = default;
@ -78,19 +63,30 @@ public:
virtual size_t getElementsCount(const CacheGuard::Lock &) const = 0;
virtual Iterator add(
KeyMetadataPtr key_metadata, size_t offset, size_t size, const CacheGuard::Lock &) = 0;
/// Throws exception if there is not enough size to fit it.
virtual IteratorPtr add( /// NOLINT
KeyMetadataPtr key_metadata,
size_t offset,
size_t size,
const CacheGuard::Lock &,
bool is_startup = false) = 0;
virtual void pop(const CacheGuard::Lock &) = 0;
virtual void removeAll(const CacheGuard::Lock &) = 0;
/// From lowest to highest priority.
virtual void iterate(IterateFunc && func, const CacheGuard::Lock &) = 0;
virtual bool canFit(size_t size, const CacheGuard::Lock &) const = 0;
virtual void shuffle(const CacheGuard::Lock &) = 0;
private:
virtual std::vector<FileSegmentInfo> dump(FileCache & cache, const CacheGuard::Lock &) = 0;
using FinalizeEvictionFunc = std::function<void(const CacheGuard::Lock & lk)>;
virtual bool collectCandidatesForEviction(
size_t size,
FileCacheReserveStat & stat,
EvictionCandidates & res,
IFileCachePriority::IteratorPtr reservee,
FinalizeEvictionFunc & finalize_eviction_func,
const CacheGuard::Lock &) = 0;
protected:
const size_t max_size = 0;
const size_t max_elements = 0;
};

View File

@ -1,5 +1,6 @@
#include <Interpreters/Cache/LRUFileCachePriority.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/EvictionCandidates.h>
#include <Common/CurrentMetrics.h>
#include <Common/randomSeed.h>
#include <Common/logger_useful.h>
@ -11,6 +12,12 @@ namespace CurrentMetrics
extern const Metric FilesystemCacheElements;
}
namespace ProfileEvents
{
extern const Event FilesystemCacheEvictionSkippedFileSegments;
extern const Event FilesystemCacheEvictionTries;
}
namespace DB
{
@ -19,26 +26,31 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
IFileCachePriority::Iterator LRUFileCachePriority::add(
IFileCachePriority::IteratorPtr LRUFileCachePriority::add( /// NOLINT
KeyMetadataPtr key_metadata,
size_t offset,
size_t size,
const CacheGuard::Lock &)
const CacheGuard::Lock & lock,
bool /* is_startup */)
{
const auto & key = key_metadata->key;
if (size == 0)
return std::make_shared<LRUIterator>(add(Entry(key_metadata->key, offset, size, key_metadata), lock));
}
LRUFileCachePriority::LRUIterator LRUFileCachePriority::add(Entry && entry, const CacheGuard::Lock &)
{
if (entry.size == 0)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Adding zero size entries to LRU queue is not allowed "
"(key: {}, offset: {})", key, offset);
"(key: {}, offset: {})", entry.key, entry.offset);
}
#ifndef NDEBUG
for (const auto & entry : queue)
for (const auto & queue_entry : queue)
{
/// entry.size == 0 means entry was invalidated.
if (entry.size != 0 && entry.key == key && entry.offset == offset)
if (queue_entry.size != 0 && queue_entry.key == entry.key && queue_entry.offset == entry.offset)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to add duplicate queue entry to queue. "
@ -48,41 +60,27 @@ IFileCachePriority::Iterator LRUFileCachePriority::add(
#endif
const auto & size_limit = getSizeLimit();
if (size_limit && current_size + size > size_limit)
if (size_limit && current_size + entry.size > size_limit)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Not enough space to add {}:{} with size {}: current size: {}/{}",
key, offset, size, current_size, size_limit);
entry.key, entry.offset, entry.size, current_size, size_limit);
}
auto iter = queue.insert(queue.end(), Entry(key, offset, size, key_metadata));
auto iterator = queue.insert(queue.end(), entry);
updateSize(size);
updateSize(entry.size);
updateElementsCount(1);
LOG_TEST(
log, "Added entry into LRU queue, key: {}, offset: {}, size: {}",
key, offset, size);
entry.key, entry.offset, entry.size);
return std::make_shared<LRUFileCacheIterator>(this, iter);
return LRUIterator(this, iterator);
}
void LRUFileCachePriority::removeAll(const CacheGuard::Lock &)
{
LOG_TEST(log, "Removed all entries from LRU queue");
updateSize(-current_size);
updateElementsCount(-current_elements_num);
queue.clear();
}
void LRUFileCachePriority::pop(const CacheGuard::Lock &)
{
remove(queue.begin());
}
LRUFileCachePriority::LRUQueueIterator LRUFileCachePriority::remove(LRUQueueIterator it)
LRUFileCachePriority::LRUQueue::iterator LRUFileCachePriority::remove(LRUQueue::iterator it, const CacheGuard::Lock &)
{
/// If size is 0, entry is invalidated, current_elements_num was already updated.
if (it->size)
@ -110,30 +108,49 @@ void LRUFileCachePriority::updateElementsCount(int64_t num)
CurrentMetrics::add(CurrentMetrics::FilesystemCacheElements, num);
}
LRUFileCachePriority::LRUFileCacheIterator::LRUFileCacheIterator(
LRUFileCachePriority::LRUIterator::LRUIterator(
LRUFileCachePriority * cache_priority_,
LRUFileCachePriority::LRUQueueIterator queue_iter_)
LRUQueue::iterator iterator_)
: cache_priority(cache_priority_)
, queue_iter(queue_iter_)
, iterator(iterator_)
{
}
void LRUFileCachePriority::iterate(IterateFunc && func, const CacheGuard::Lock &)
LRUFileCachePriority::LRUIterator::LRUIterator(const LRUIterator & other)
{
*this = other;
}
LRUFileCachePriority::LRUIterator & LRUFileCachePriority::LRUIterator::operator =(const LRUIterator & other)
{
if (this == &other)
return *this;
cache_priority = other.cache_priority;
iterator = other.iterator;
return *this;
}
bool LRUFileCachePriority::LRUIterator::operator ==(const LRUIterator & other) const
{
return cache_priority == other.cache_priority && iterator == other.iterator;
}
void LRUFileCachePriority::iterate(IterateFunc && func, const CacheGuard::Lock & lock)
{
for (auto it = queue.begin(); it != queue.end();)
{
auto locked_key = it->key_metadata->tryLock();
if (!locked_key || it->size == 0)
{
it = remove(it);
it = remove(it, lock);
continue;
}
auto metadata = locked_key->tryGetByOffset(it->offset);
if (!metadata)
{
it = remove(it);
it = remove(it, lock);
continue;
}
@ -160,63 +177,167 @@ void LRUFileCachePriority::iterate(IterateFunc && func, const CacheGuard::Lock &
}
case IterationResult::REMOVE_AND_CONTINUE:
{
it = remove(it);
it = remove(it, lock);
break;
}
}
}
}
void LRUFileCachePriority::LRUFileCacheIterator::remove(const CacheGuard::Lock &)
bool LRUFileCachePriority::canFit(size_t size, const CacheGuard::Lock & lock) const
{
checkUsable();
cache_priority->remove(queue_iter);
queue_iter = LRUQueueIterator{};
return canFit(size, 0, 0, lock);
}
void LRUFileCachePriority::LRUFileCacheIterator::invalidate()
bool LRUFileCachePriority::canFit(
size_t size,
size_t released_size_assumption,
size_t released_elements_assumption,
const CacheGuard::Lock &) const
{
checkUsable();
return (max_size == 0 || (current_size + size - released_size_assumption <= max_size))
&& (max_elements == 0 || current_elements_num + 1 - released_elements_assumption <= max_elements);
}
bool LRUFileCachePriority::collectCandidatesForEviction(
size_t size,
FileCacheReserveStat & stat,
EvictionCandidates & res,
IFileCachePriority::IteratorPtr,
FinalizeEvictionFunc &,
const CacheGuard::Lock & lock)
{
if (canFit(size, lock))
return true;
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionTries);
IterateFunc iterate_func = [&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{
const auto & file_segment = segment_metadata->file_segment;
chassert(file_segment->assertCorrectness());
if (segment_metadata->releasable())
{
res.add(locked_key, segment_metadata);
stat.update(segment_metadata->size(), file_segment->getKind(), true);
}
else
{
stat.update(segment_metadata->size(), file_segment->getKind(), false);
ProfileEvents::increment(ProfileEvents::FilesystemCacheEvictionSkippedFileSegments);
}
return IterationResult::CONTINUE;
};
auto can_fit = [&]
{
return canFit(size, stat.stat.releasable_size, stat.stat.releasable_count, lock);
};
iterate([&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{
return can_fit() ? IterationResult::BREAK : iterate_func(locked_key, segment_metadata);
}, lock);
return can_fit();
}
LRUFileCachePriority::LRUIterator LRUFileCachePriority::move(LRUIterator & it, LRUFileCachePriority & other, const CacheGuard::Lock &)
{
const auto & entry = it.getEntry();
if (entry.size == 0)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Adding zero size entries to LRU queue is not allowed "
"(key: {}, offset: {})", entry.key, entry.offset);
}
#ifndef NDEBUG
for (const auto & queue_entry : queue)
{
/// entry.size == 0 means entry was invalidated.
if (queue_entry.size != 0 && queue_entry.key == entry.key && queue_entry.offset == entry.offset)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to add duplicate queue entry to queue. "
"(Key: {}, offset: {}, size: {})",
entry.key, entry.offset, entry.size);
}
#endif
queue.splice(queue.end(), other.queue, it.iterator);
updateSize(entry.size);
updateElementsCount(1);
other.updateSize(-entry.size);
other.updateElementsCount(-1);
return LRUIterator(this, it.iterator);
}
std::vector<FileSegmentInfo> LRUFileCachePriority::dump(FileCache & cache, const CacheGuard::Lock & lock)
{
std::vector<FileSegmentInfo> res;
iterate([&](LockedKey &, const FileSegmentMetadataPtr & segment_metadata)
{
res.emplace_back(FileSegment::getInfo(segment_metadata->file_segment, cache));
return IterationResult::CONTINUE;
}, lock);
return res;
}
void LRUFileCachePriority::LRUIterator::remove(const CacheGuard::Lock & lock)
{
assertValid();
cache_priority->remove(iterator, lock);
iterator = LRUQueue::iterator{};
}
void LRUFileCachePriority::LRUIterator::invalidate()
{
assertValid();
LOG_TEST(
cache_priority->log,
"Invalidating entry in LRU queue. Key: {}, offset: {}, previous size: {}",
queue_iter->key, queue_iter->offset, queue_iter->size);
iterator->key, iterator->offset, iterator->size);
cache_priority->updateSize(-queue_iter->size);
cache_priority->updateSize(-iterator->size);
cache_priority->updateElementsCount(-1);
queue_iter->size = 0;
iterator->size = 0;
}
void LRUFileCachePriority::LRUFileCacheIterator::updateSize(int64_t size)
void LRUFileCachePriority::LRUIterator::updateSize(int64_t size)
{
checkUsable();
assertValid();
LOG_TEST(
cache_priority->log,
"Update size with {} in LRU queue for key: {}, offset: {}, previous size: {}",
size, queue_iter->key, queue_iter->offset, queue_iter->size);
size, iterator->key, iterator->offset, iterator->size);
cache_priority->updateSize(size);
queue_iter->size += size;
iterator->size += size;
}
size_t LRUFileCachePriority::LRUFileCacheIterator::use(const CacheGuard::Lock &)
size_t LRUFileCachePriority::LRUIterator::increasePriority(const CacheGuard::Lock &)
{
checkUsable();
cache_priority->queue.splice(cache_priority->queue.end(), cache_priority->queue, queue_iter);
return ++queue_iter->hits;
assertValid();
cache_priority->queue.splice(cache_priority->queue.end(), cache_priority->queue, iterator);
return ++iterator->hits;
}
void LRUFileCachePriority::LRUFileCacheIterator::checkUsable() const
void LRUFileCachePriority::LRUIterator::assertValid() const
{
if (queue_iter == LRUQueueIterator{})
if (iterator == LRUQueue::iterator{})
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to use invalid iterator");
}
void LRUFileCachePriority::shuffle(const CacheGuard::Lock &)
{
std::vector<LRUQueueIterator> its;
std::vector<LRUQueue::iterator> its;
its.reserve(queue.size());
for (auto it = queue.begin(); it != queue.end(); ++it)
its.push_back(it);

View File

@ -6,43 +6,48 @@
#include <Common/logger_useful.h>
#include "Interpreters/Cache/Guards.h"
namespace CurrentMetrics
{
extern const Metric FilesystemCacheSizeLimit;
}
namespace DB
{
/// Based on the LRU algorithm implementation, the record with the lowest priority is stored at
/// the head of the queue, and the record with the highest priority is stored at the tail.
class LRUFileCachePriority : public IFileCachePriority
class LRUFileCachePriority final : public IFileCachePriority
{
private:
class LRUFileCacheIterator;
class LRUIterator;
using LRUQueue = std::list<Entry>;
using LRUQueueIterator = typename LRUQueue::iterator;
friend class SLRUFileCachePriority;
public:
LRUFileCachePriority(size_t max_size_, size_t max_elements_) : IFileCachePriority(max_size_, max_elements_)
{
CurrentMetrics::set(CurrentMetrics::FilesystemCacheSizeLimit, max_size_);
}
LRUFileCachePriority(size_t max_size_, size_t max_elements_) : IFileCachePriority(max_size_, max_elements_) {}
size_t getSize(const CacheGuard::Lock &) const override { return current_size; }
size_t getElementsCount(const CacheGuard::Lock &) const override { return current_elements_num; }
Iterator add(KeyMetadataPtr key_metadata, size_t offset, size_t size, const CacheGuard::Lock &) override;
bool canFit(size_t size, const CacheGuard::Lock &) const override;
void pop(const CacheGuard::Lock &) override;
IteratorPtr add( /// NOLINT
KeyMetadataPtr key_metadata,
size_t offset,
size_t size,
const CacheGuard::Lock &,
bool is_startup = false) override;
void removeAll(const CacheGuard::Lock &) override;
void iterate(IterateFunc && func, const CacheGuard::Lock &) override;
bool collectCandidatesForEviction(
size_t size,
FileCacheReserveStat & stat,
EvictionCandidates & res,
IFileCachePriority::IteratorPtr reservee,
FinalizeEvictionFunc & finalize_eviction_func,
const CacheGuard::Lock &) override;
void shuffle(const CacheGuard::Lock &) override;
std::vector<FileSegmentInfo> dump(FileCache & cache, const CacheGuard::Lock &) override;
void pop(const CacheGuard::Lock & lock) { remove(queue.begin(), lock); }
private:
void updateElementsCount(int64_t num);
void updateSize(int64_t size);
@ -55,21 +60,38 @@ private:
/// because of invalidated entries.
std::atomic<size_t> current_elements_num = 0;
LRUQueueIterator remove(LRUQueueIterator it);
bool canFit(size_t size, size_t released_size_assumption, size_t released_elements_assumption, const CacheGuard::Lock &) const;
LRUQueue::iterator remove(LRUQueue::iterator it, const CacheGuard::Lock &);
enum class IterationResult
{
BREAK,
CONTINUE,
REMOVE_AND_CONTINUE,
};
using IterateFunc = std::function<IterationResult(LockedKey &, const FileSegmentMetadataPtr &)>;
void iterate(IterateFunc && func, const CacheGuard::Lock &);
LRUIterator move(LRUIterator & it, LRUFileCachePriority & other, const CacheGuard::Lock &);
LRUIterator add(Entry && entry, const CacheGuard::Lock &);
};
class LRUFileCachePriority::LRUFileCacheIterator : public IFileCachePriority::IIterator
class LRUFileCachePriority::LRUIterator : public IFileCachePriority::Iterator
{
friend class LRUFileCachePriority;
friend class SLRUFileCachePriority;
public:
LRUFileCacheIterator(
LRUFileCachePriority * cache_priority_,
LRUFileCachePriority::LRUQueueIterator queue_iter_);
LRUIterator(LRUFileCachePriority * cache_priority_, LRUQueue::iterator iterator_);
const Entry & getEntry() const override { return *queue_iter; }
LRUIterator(const LRUIterator & other);
LRUIterator & operator =(const LRUIterator & other);
bool operator ==(const LRUIterator & other) const;
Entry & getEntry() override { return *queue_iter; }
const Entry & getEntry() const override { return *iterator; }
size_t use(const CacheGuard::Lock &) override;
size_t increasePriority(const CacheGuard::Lock &) override;
void remove(const CacheGuard::Lock &) override;
@ -77,11 +99,13 @@ public:
void updateSize(int64_t size) override;
QueueEntryType getType() const override { return QueueEntryType::LRU; }
private:
void checkUsable() const;
void assertValid() const;
LRUFileCachePriority * cache_priority;
mutable LRUFileCachePriority::LRUQueueIterator queue_iter;
mutable LRUQueue::iterator iterator;
};
}

View File

@ -29,7 +29,7 @@ struct FileSegmentMetadata : private boost::noncopyable
bool evicting() const { return removal_candidate.load(); }
Priority::Iterator getQueueIterator() const { return file_segment->getQueueIterator(); }
Priority::IteratorPtr getQueueIterator() const { return file_segment->getQueueIterator(); }
FileSegmentPtr file_segment;
std::atomic<bool> removal_candidate{false};

View File

@ -95,7 +95,7 @@ void FileCacheQueryLimit::QueryContext::remove(
records.erase({key, offset});
}
IFileCachePriority::Iterator FileCacheQueryLimit::QueryContext::tryGet(
IFileCachePriority::IteratorPtr FileCacheQueryLimit::QueryContext::tryGet(
const Key & key,
size_t offset,
const CacheGuard::Lock &)

View File

@ -27,7 +27,6 @@ public:
public:
using Key = FileCacheKey;
using Priority = IFileCachePriority;
using PriorityIterator = IFileCachePriority::Iterator;
QueryContext(size_t query_cache_size, bool recache_on_query_limit_exceeded_);
@ -36,7 +35,7 @@ public:
bool recacheOnFileCacheQueryLimitExceeded() const { return recache_on_query_limit_exceeded; }
IFileCachePriority::Iterator tryGet(
Priority::IteratorPtr tryGet(
const Key & key,
size_t offset,
const CacheGuard::Lock &);
@ -53,7 +52,7 @@ public:
const CacheGuard::Lock &);
private:
using Records = std::unordered_map<FileCacheKeyAndOffset, IFileCachePriority::Iterator, FileCacheKeyAndOffsetHash>;
using Records = std::unordered_map<FileCacheKeyAndOffset, Priority::IteratorPtr, FileCacheKeyAndOffsetHash>;
Records records;
LRUFileCachePriority priority;
const bool recache_on_query_limit_exceeded;

View File

@ -0,0 +1,284 @@
#include <Interpreters/Cache/SLRUFileCachePriority.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/EvictionCandidates.h>
#include <Common/CurrentMetrics.h>
#include <Common/randomSeed.h>
#include <Common/logger_useful.h>
#include <Common/assert_cast.h>
namespace DB
{
namespace
{
size_t getRatio(size_t total, double ratio)
{
return static_cast<size_t>(total * std::clamp(ratio, 0.0, 1.0));
}
}
SLRUFileCachePriority::SLRUFileCachePriority(
size_t max_size_,
size_t max_elements_,
double size_ratio)
: IFileCachePriority(max_size_, max_elements_)
, protected_queue(LRUFileCachePriority(getRatio(max_size_, size_ratio), getRatio(max_elements_, size_ratio)))
, probationary_queue(LRUFileCachePriority(getRatio(max_size_, 1 - size_ratio), getRatio(max_elements_, 1 - size_ratio)))
{
LOG_DEBUG(
log, "Using probationary queue size: {}, protected queue size: {}",
probationary_queue.getSizeLimit(), protected_queue.getSizeLimit());
}
size_t SLRUFileCachePriority::getSize(const CacheGuard::Lock & lock) const
{
return protected_queue.getSize(lock) + probationary_queue.getSize(lock);
}
size_t SLRUFileCachePriority::getElementsCount(const CacheGuard::Lock & lock) const
{
return protected_queue.getElementsCount(lock) + probationary_queue.getElementsCount(lock);
}
bool SLRUFileCachePriority::canFit(size_t size, const CacheGuard::Lock & lock) const
{
return probationary_queue.canFit(size, lock) || protected_queue.canFit(size, lock);
}
IFileCachePriority::IteratorPtr SLRUFileCachePriority::add( /// NOLINT
KeyMetadataPtr key_metadata,
size_t offset,
size_t size,
const CacheGuard::Lock & lock,
bool is_startup)
{
if (is_startup)
{
/// If it is server startup, we put entries in any queue it will fit in,
/// but with preference for probationary queue,
/// because we do not know the distribution between queues after server restart.
if (probationary_queue.canFit(size, lock))
{
auto lru_iterator = probationary_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), false);
}
else
{
auto lru_iterator = protected_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), true);
}
}
else
{
auto lru_iterator = probationary_queue.add(Entry(key_metadata->key, offset, size, key_metadata), lock);
return std::make_shared<SLRUIterator>(this, std::move(lru_iterator), false);
}
}
bool SLRUFileCachePriority::collectCandidatesForEviction(
size_t size,
FileCacheReserveStat & stat,
EvictionCandidates & res,
IFileCachePriority::IteratorPtr reservee,
FinalizeEvictionFunc & finalize_eviction_func,
const CacheGuard::Lock & lock)
{
/// If `it` is nullptr, then it is the first space reservation attempt
/// for a corresponding file segment, so it will be directly put into probationary queue.
if (!reservee)
{
return probationary_queue.collectCandidatesForEviction(size, stat, res, reservee, finalize_eviction_func, lock);
}
/// If `it` not nullptr (e.g. is already in some queue),
/// we need to check in which queue (protected/probationary) it currently is
/// (in order to know where we need to free space).
if (!assert_cast<SLRUIterator *>(reservee.get())->is_protected)
{
return probationary_queue.collectCandidatesForEviction(size, stat, res, reservee, finalize_eviction_func, lock);
}
/// Entry is in protected queue.
/// Check if we have enough space in protected queue to fit a new size of entry.
/// `size` is the increment to the current entry.size we want to increase.
if (protected_queue.canFit(size, lock))
return true;
/// If not enough space - we need to "downgrade" lowest priority entries from protected
/// queue to probationary queue.
/// The amount of such "downgraded" entries is equal to the amount
/// required to make space for additionary `size` bytes for entry.
auto downgrade_candidates = std::make_shared<EvictionCandidates>();
FileCacheReserveStat downgrade_stat;
FinalizeEvictionFunc noop;
if (!protected_queue.collectCandidatesForEviction(size, downgrade_stat, *downgrade_candidates, reservee, noop, lock))
return false;
const size_t size_to_downgrade = downgrade_stat.stat.releasable_size;
if (!probationary_queue.canFit(size_to_downgrade, lock)
&& !probationary_queue.collectCandidatesForEviction(size_to_downgrade, stat, res, reservee, noop, lock))
return false;
finalize_eviction_func = [=, this](const CacheGuard::Lock & lk) mutable
{
for (const auto & [key, key_candidates] : *downgrade_candidates)
{
for (const auto & candidate : key_candidates.candidates)
{
auto * candidate_it = assert_cast<SLRUIterator *>(candidate->getQueueIterator().get());
candidate_it->lru_iterator = probationary_queue.move(candidate_it->lru_iterator, protected_queue, lk);
candidate_it->is_protected = false;
}
}
};
return true;
}
void SLRUFileCachePriority::increasePriority(SLRUIterator & iterator, const CacheGuard::Lock & lock)
{
/// If entry is already in protected queue,
/// we only need to increase its priority within the protected queue.
if (iterator.is_protected)
{
iterator.lru_iterator.increasePriority(lock);
return;
}
/// Entry is in probationary queue.
/// We need to move it to protected queue.
const size_t size = iterator.getEntry().size;
if (size > protected_queue.getSizeLimit())
{
/// Entry size is bigger than the whole protected queue limit.
/// This is only possible if protected_queue_size_limit is less than max_file_segment_size,
/// which is not possible in any realistic cache configuration.
iterator.lru_iterator.increasePriority(lock);
return;
}
/// Check if there is enough space in protected queue to move entry there.
/// If not - we need to "downgrade" lowest priority entries from protected
/// queue to probationary queue.
EvictionCandidates downgrade_candidates;
FileCacheReserveStat downgrade_stat;
FinalizeEvictionFunc noop;
if (!protected_queue.collectCandidatesForEviction(size, downgrade_stat, downgrade_candidates, {}, noop, lock))
{
/// We cannot make space for entry to be moved to protected queue
/// (not enough releasable file segments).
/// Then just increase its priority within probationary queue.
iterator.lru_iterator.increasePriority(lock);
return;
}
/// The amount of such "downgraded" entries is equal to the amount
/// required to make space for entry we want to insert.
const size_t size_to_downgrade = downgrade_stat.stat.releasable_count;
size_t size_to_free = 0;
if (size_to_downgrade && size_to_downgrade > size)
size_to_free = size_to_downgrade - size;
/// Now we need to check if those "downgrade" candidates can actually
/// be moved to probationary queue.
EvictionCandidates eviction_candidates;
FileCacheReserveStat stat;
if (size_to_free)
{
if (!probationary_queue.collectCandidatesForEviction(size_to_free, stat, eviction_candidates, {}, noop, lock))
{
/// "downgrade" candidates cannot be moved to probationary queue,
/// so entry cannot be moved to protected queue as well.
/// Then just increase its priority within probationary queue.
iterator.lru_iterator.increasePriority(lock);
return;
}
/// Make space for "downgrade" candidates.
eviction_candidates.evict(nullptr, lock);
}
/// All checks passed, now we can move downgrade candidates to
/// probationary queue and our entry to protected queue.
Entry entry_copy = iterator.getEntry();
iterator.lru_iterator.remove(lock);
for (const auto & [key, key_candidates] : downgrade_candidates)
{
for (const auto & candidate : key_candidates.candidates)
{
auto * candidate_it = assert_cast<SLRUIterator *>(candidate->getQueueIterator().get());
candidate_it->lru_iterator = probationary_queue.move(candidate_it->lru_iterator, protected_queue, lock);
candidate_it->is_protected = false;
}
}
iterator.lru_iterator = protected_queue.add(std::move(entry_copy), lock);
iterator.is_protected = true;
}
std::vector<FileSegmentInfo> SLRUFileCachePriority::dump(FileCache & cache, const CacheGuard::Lock & lock)
{
auto res = probationary_queue.dump(cache, lock);
auto part_res = protected_queue.dump(cache, lock);
res.insert(res.end(), part_res.begin(), part_res.end());
return res;
}
void SLRUFileCachePriority::shuffle(const CacheGuard::Lock & lock)
{
protected_queue.shuffle(lock);
probationary_queue.shuffle(lock);
}
SLRUFileCachePriority::SLRUIterator::SLRUIterator(
SLRUFileCachePriority * cache_priority_,
LRUFileCachePriority::LRUIterator && lru_iterator_,
bool is_protected_)
: cache_priority(cache_priority_)
, lru_iterator(lru_iterator_)
, is_protected(is_protected_)
{
}
const SLRUFileCachePriority::Entry & SLRUFileCachePriority::SLRUIterator::getEntry() const
{
assertValid();
return lru_iterator.getEntry();
}
size_t SLRUFileCachePriority::SLRUIterator::increasePriority(const CacheGuard::Lock & lock)
{
assertValid();
cache_priority->increasePriority(*this, lock);
return getEntry().hits;
}
void SLRUFileCachePriority::SLRUIterator::updateSize(int64_t size)
{
assertValid();
lru_iterator.updateSize(size);
}
void SLRUFileCachePriority::SLRUIterator::invalidate()
{
assertValid();
lru_iterator.invalidate();
}
void SLRUFileCachePriority::SLRUIterator::remove(const CacheGuard::Lock & lock)
{
assertValid();
lru_iterator.remove(lock);
}
void SLRUFileCachePriority::SLRUIterator::assertValid() const
{
lru_iterator.assertValid();
}
}

View File

@ -0,0 +1,85 @@
#pragma once
#include <Interpreters/Cache/LRUFileCachePriority.h>
#include <Common/logger_useful.h>
namespace DB
{
/// Based on the SLRU algorithm implementation, the record with the lowest priority is stored at
/// the head of the queue, and the record with the highest priority is stored at the tail.
class SLRUFileCachePriority : public IFileCachePriority
{
private:
using LRUIterator = LRUFileCachePriority::LRUIterator;
using LRUQueue = std::list<Entry>;
public:
class SLRUIterator;
SLRUFileCachePriority(size_t max_size_, size_t max_elements_, double size_ratio);
size_t getSize(const CacheGuard::Lock & lock) const override;
size_t getElementsCount(const CacheGuard::Lock &) const override;
bool canFit(size_t size, const CacheGuard::Lock &) const override;
IteratorPtr add( /// NOLINT
KeyMetadataPtr key_metadata,
size_t offset,
size_t size,
const CacheGuard::Lock &,
bool is_startup = false) override;
bool collectCandidatesForEviction(
size_t size,
FileCacheReserveStat & stat,
EvictionCandidates & res,
IFileCachePriority::IteratorPtr reservee,
FinalizeEvictionFunc & finalize_eviction_func,
const CacheGuard::Lock &) override;
void shuffle(const CacheGuard::Lock &) override;
std::vector<FileSegmentInfo> dump(FileCache & cache, const CacheGuard::Lock &) override;
private:
LRUFileCachePriority protected_queue;
LRUFileCachePriority probationary_queue;
Poco::Logger * log = &Poco::Logger::get("SLRUFileCachePriority");
void increasePriority(SLRUIterator & iterator, const CacheGuard::Lock & lock);
};
class SLRUFileCachePriority::SLRUIterator : public IFileCachePriority::Iterator
{
friend class SLRUFileCachePriority;
public:
SLRUIterator(
SLRUFileCachePriority * cache_priority_,
LRUIterator && lru_iterator_,
bool is_protected_);
const Entry & getEntry() const override;
size_t increasePriority(const CacheGuard::Lock &) override;
void remove(const CacheGuard::Lock &) override;
void invalidate() override;
void updateSize(int64_t size) override;
QueueEntryType getType() const override { return is_protected ? QueueEntryType::SLRU_Protected : QueueEntryType::SLRU_Probationary; }
private:
void assertValid() const;
SLRUFileCachePriority * cache_priority;
mutable LRUIterator lru_iterator;
bool is_protected;
};
}

View File

@ -19,6 +19,7 @@
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheSettings.h>
#include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Cache/SLRUFileCachePriority.h>
#include <Interpreters/Context.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <base/hex.h>
@ -83,11 +84,12 @@ using HolderPtr = FileSegmentsHolderPtr;
fs::path caches_dir = fs::current_path() / "lru_cache_test";
std::string cache_base_path = caches_dir / "cache1" / "";
std::string cache_base_path2 = caches_dir / "cache2" / "";
void assertEqual(const FileSegmentsHolderPtr & file_segments, const Ranges & expected_ranges, const States & expected_states = {})
{
std::cerr << "File segments: ";
std::cerr << "\nFile segments: ";
for (const auto & file_segment : *file_segments)
std::cerr << file_segment->range().toString() << ", ";
@ -115,9 +117,12 @@ void assertEqual(const FileSegmentsHolderPtr & file_segments, const Ranges & exp
void assertEqual(const std::vector<FileSegment::Info> & file_segments, const Ranges & expected_ranges, const States & expected_states = {})
{
std::cerr << "File segments: ";
std::cerr << "\nFile segments: ";
for (const auto & file_segment : file_segments)
std::cerr << FileSegment::Range(file_segment.range_left, file_segment.range_right).toString() << ", ";
std::cerr << "\nExpected: ";
for (const auto & r : expected_ranges)
std::cerr << r.toString() << ", ";
ASSERT_EQ(file_segments.size(), expected_ranges.size());
@ -141,6 +146,49 @@ void assertEqual(const std::vector<FileSegment::Info> & file_segments, const Ran
}
}
void assertProtectedOrProbationary(const std::vector<FileSegmentInfo> & file_segments, const Ranges & expected, bool assert_protected)
{
std::cerr << "\nFile segments: ";
std::vector<Range> res;
for (const auto & f : file_segments)
{
auto range = FileSegment::Range(f.range_left, f.range_right);
bool is_protected = (f.queue_entry_type == FileCacheQueueEntryType::SLRU_Protected);
bool is_probationary = (f.queue_entry_type == FileCacheQueueEntryType::SLRU_Probationary);
ASSERT_TRUE(is_probationary || is_protected);
std::cerr << fmt::format("{} (protected: {})", range.toString(), is_protected) << ", ";
if ((is_protected && assert_protected) || (!is_protected && !assert_protected))
{
res.push_back(range);
}
}
std::cerr << "\nExpected: ";
for (const auto & range : expected)
{
std::cerr << range.toString() << ", ";
}
ASSERT_EQ(res.size(), expected.size());
for (size_t i = 0; i < res.size(); ++i)
{
ASSERT_EQ(res[i], expected[i]);
}
}
void assertProtected(const std::vector<FileSegmentInfo> & file_segments, const Ranges & expected)
{
std::cerr << "\nAssert protected";
assertProtectedOrProbationary(file_segments, expected, true);
}
void assertProbationary(const std::vector<FileSegmentInfo> & file_segments, const Ranges & expected)
{
std::cerr << "\nAssert probationary";
assertProtectedOrProbationary(file_segments, expected, false);
}
FileSegment & get(const HolderPtr & holder, int i)
{
auto it = std::next(holder->begin(), i);
@ -151,7 +199,7 @@ FileSegment & get(const HolderPtr & holder, int i)
void download(FileSegment & file_segment)
{
std::cerr << "Downloading range " << file_segment.range().toString() << "\n";
std::cerr << "\nDownloading range " << file_segment.range().toString() << "\n";
ASSERT_EQ(file_segment.getOrSetDownloader(), FileSegment::getCallerId());
ASSERT_EQ(file_segment.state(), State::DOWNLOADING);
@ -184,7 +232,14 @@ void download(const HolderPtr & holder)
void increasePriority(const HolderPtr & holder)
{
for (auto & it : *holder)
it->use();
it->increasePriority();
}
void increasePriority(const HolderPtr & holder, size_t pos)
{
FileSegments::iterator it = holder->begin();
std::advance(it, pos);
(*it)->increasePriority();
}
class FileCacheTest : public ::testing::Test
@ -221,7 +276,10 @@ public:
if (fs::exists(cache_base_path))
fs::remove_all(cache_base_path);
if (fs::exists(cache_base_path2))
fs::remove_all(cache_base_path2);
fs::create_directories(cache_base_path);
fs::create_directories(cache_base_path2);
}
void TearDown() override
@ -233,7 +291,7 @@ public:
pcg64 rng;
};
TEST_F(FileCacheTest, get)
TEST_F(FileCacheTest, LRUPolicy)
{
DB::ThreadStatus thread_status;
@ -1057,3 +1115,206 @@ TEST_F(FileCacheTest, TemporaryDataReadBufferSize)
ASSERT_EQ(stream.getSize(), 62);
}
}
TEST_F(FileCacheTest, SLRUPolicy)
{
DB::ThreadStatus thread_status;
std::string query_id = "query_id"; /// To work with cache need query_id and query context.
Poco::XML::DOMParser dom_parser;
std::string xml(R"CONFIG(<clickhouse>
</clickhouse>)CONFIG");
Poco::AutoPtr<Poco::XML::Document> document = dom_parser.parseString(xml);
Poco::AutoPtr<Poco::Util::XMLConfiguration> config = new Poco::Util::XMLConfiguration(document);
getMutableContext().context->setConfig(config);
auto query_context = DB::Context::createCopy(getContext().context);
query_context->makeQueryContext();
query_context->setCurrentQueryId(query_id);
chassert(&DB::CurrentThread::get() == &thread_status);
DB::CurrentThread::QueryScope query_scope_holder(query_context);
DB::FileCacheSettings settings;
settings.base_path = cache_base_path;
settings.max_size = 40;
settings.max_elements = 6;
settings.boundary_alignment = 1;
settings.cache_policy = "SLRU";
settings.slru_size_ratio = 0.5;
const size_t file_size = -1; // the value doesn't really matter because boundary_alignment == 1.
size_t file_cache_name = 0;
{
auto cache = DB::FileCache(std::to_string(++file_cache_name), settings);
cache.initialize();
auto key = cache.createKeyForPath("key1");
auto add_range = [&](size_t offset, size_t size)
{
std::cerr << "Add [" << offset << ", " << offset + size - 1 << "]" << std::endl;
auto holder = cache.getOrSet(key, offset, size, file_size, {});
assertEqual(holder, { Range(offset, offset + size - 1) }, { State::EMPTY });
download(holder->front());
assertEqual(holder, { Range(offset, offset + size - 1) }, { State::DOWNLOADED });
};
auto check_covering_range = [&](size_t offset, size_t size, Ranges covering_ranges)
{
auto holder = cache.getOrSet(key, offset, size, file_size, {});
std::vector<State> states(covering_ranges.size(), State::DOWNLOADED);
assertEqual(holder, covering_ranges, states);
increasePriority(holder);
};
add_range(0, 10);
add_range(10, 5);
assertEqual(cache.getFileSegmentInfos(key), { Range(0, 9), Range(10, 14) });
assertEqual(cache.dumpQueue(), { Range(0, 9), Range(10, 14) });
ASSERT_EQ(cache.getFileSegmentsNum(), 2);
ASSERT_EQ(cache.getUsedCacheSize(), 15);
assertProbationary(cache.dumpQueue(), { Range(0, 9), Range(10, 14) });
assertProtected(cache.dumpQueue(), Ranges{});
check_covering_range(9, 1, { Range(0, 9) });
assertEqual(cache.dumpQueue(), { Range(10, 14), Range(0, 9) });
check_covering_range(10, 1, { Range(10, 14) });
assertEqual(cache.dumpQueue(), { Range(0, 9), Range(10, 14) });
assertProbationary(cache.dumpQueue(), Ranges{});
assertProtected(cache.dumpQueue(), { Range(0, 9), Range(10, 14) });
add_range(17, 4);
assertEqual(cache.dumpQueue(), { Range(17, 20), Range(0, 9), Range(10, 14) });
add_range(24, 3);
assertEqual(cache.dumpQueue(), { Range(17, 20), Range(24, 26), Range(0, 9), Range(10, 14) });
add_range(27, 1);
assertEqual(cache.dumpQueue(), { Range(17, 20), Range(24, 26), Range(27, 27), Range(0, 9), Range(10, 14) });
assertProbationary(cache.dumpQueue(), { Range(17, 20), Range(24, 26), Range(27, 27) });
assertProtected(cache.dumpQueue(), { Range(0, 9), Range(10, 14) });
assertEqual(cache.getFileSegmentInfos(key), { Range(0, 9), Range(10, 14), Range(17, 20), Range(24, 26), Range(27, 27) });
ASSERT_EQ(cache.getFileSegmentsNum(), 5);
ASSERT_EQ(cache.getUsedCacheSize(), 23);
add_range(28, 3);
assertEqual(cache.dumpQueue(), { Range(24, 26), Range(27, 27), Range(28, 30), Range(0, 9), Range(10, 14) });
assertProbationary(cache.dumpQueue(), { Range(24, 26), Range(27, 27), Range(28, 30) });
assertProtected(cache.dumpQueue(), { Range(0, 9), Range(10, 14) });
check_covering_range(4, 1, { Range(0, 9) });
assertProbationary(cache.dumpQueue(), { Range(24, 26), Range(27, 27), Range(28, 30) });
assertProtected(cache.dumpQueue(), { Range(10, 14), Range(0, 9) });
check_covering_range(27, 3, { Range(27, 27), Range(28, 30) });
assertProbationary(cache.dumpQueue(), { Range(24, 26), Range(10, 14) });
assertProtected(cache.dumpQueue(), { Range(0, 9), Range(27, 27), Range(28, 30) });
assertEqual(cache.getFileSegmentInfos(key), { Range(0, 9), Range(10, 14), Range(24, 26), Range(27, 27), Range(28, 30) });
ASSERT_EQ(cache.getFileSegmentsNum(), 5);
ASSERT_EQ(cache.getUsedCacheSize(), 22);
}
{
ReadSettings read_settings;
read_settings.enable_filesystem_cache = true;
read_settings.local_fs_method = LocalFSReadMethod::pread;
auto write_file = [](const std::string & filename, const std::string & s)
{
std::string file_path = fs::current_path() / filename;
auto wb = std::make_unique<WriteBufferFromFile>(file_path, DBMS_DEFAULT_BUFFER_SIZE);
wb->write(s.data(), s.size());
wb->next();
wb->finalize();
return file_path;
};
DB::FileCacheSettings settings2;
settings2.base_path = cache_base_path2;
settings2.max_file_segment_size = 5;
settings2.max_size = 30;
settings2.max_elements = 6;
settings2.boundary_alignment = 1;
settings2.cache_policy = "SLRU";
settings2.slru_size_ratio = 0.5;
auto cache = std::make_shared<DB::FileCache>("slru_2", settings2);
cache->initialize();
auto read_and_check = [&](const std::string & file, const FileCacheKey & key, const std::string & expect_result)
{
auto read_buffer_creator = [&]()
{
return createReadBufferFromFileBase(file, read_settings, std::nullopt, std::nullopt);
};
auto cached_buffer = std::make_shared<CachedOnDiskReadBufferFromFile>(
file, key, cache, read_buffer_creator, read_settings, "test", expect_result.size(), false, false, std::nullopt, nullptr);
WriteBufferFromOwnString result;
copyData(*cached_buffer, result);
ASSERT_EQ(result.str(), expect_result);
};
std::string data1(15, '*');
auto file1 = write_file("test1", data1);
auto key1 = cache->createKeyForPath(file1);
read_and_check(file1, key1, data1);
assertEqual(cache->dumpQueue(), { Range(0, 4), Range(5, 9), Range(10, 14) });
assertProbationary(cache->dumpQueue(), { Range(0, 4), Range(5, 9), Range(10, 14) });
assertProtected(cache->dumpQueue(), Ranges{});
read_and_check(file1, key1, data1);
assertEqual(cache->dumpQueue(), { Range(0, 4), Range(5, 9), Range(10, 14) });
assertProbationary(cache->dumpQueue(), Ranges{});
assertProtected(cache->dumpQueue(), { Range(0, 4), Range(5, 9), Range(10, 14) });
std::string data2(10, '*');
auto file2 = write_file("test2", data2);
auto key2 = cache->createKeyForPath(file2);
read_and_check(file2, key2, data2);
auto dump = cache->dumpQueue();
assertEqual(dump, { Range(0, 4), Range(5, 9), Range(0, 4), Range(5, 9), Range(10, 14) });
ASSERT_EQ(dump[0].key, key2);
ASSERT_EQ(dump[1].key, key2);
ASSERT_EQ(dump[2].key, key1);
ASSERT_EQ(dump[3].key, key1);
ASSERT_EQ(dump[4].key, key1);
assertProbationary(cache->dumpQueue(), { Range(0, 4), Range(5, 9) });
assertProtected(cache->dumpQueue(), { Range(0, 4), Range(5, 9), Range(10, 14) });
read_and_check(file2, key2, data2);
dump = cache->dumpQueue();
assertEqual(dump, { Range(0, 4), Range(5, 9), Range(10, 14), Range(0, 4), Range(5, 9) });
ASSERT_EQ(dump[0].key, key1);
ASSERT_EQ(dump[1].key, key1);
ASSERT_EQ(dump[2].key, key1);
ASSERT_EQ(dump[3].key, key2);
ASSERT_EQ(dump[4].key, key2);
assertProbationary(cache->dumpQueue(), { Range(0, 4), Range(5, 9) });
assertProtected(cache->dumpQueue(), { Range(10, 14), Range(0, 4), Range(5, 9) });
}
}

View File

@ -13,9 +13,11 @@
<type>cache</type>
<disk>s3_disk</disk>
<path>s3_cache/</path>
<max_size>128Mi</max_size>
<max_size>64Mi</max_size>
<cache_on_write_operations>1</cache_on_write_operations>
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
<cache_policy>LRU</cache_policy>
<slru_size_ratio>0.3</slru_size_ratio>
</s3_cache>
<!-- local disks -->
<local_disk>