Do not take KeyGuard::Lock under CacheMetadataGuard::Lock to make metadata lock truly lightweight

This commit is contained in:
kssenii 2023-03-26 18:37:26 +02:00
parent 88f631b297
commit e6e71dc832
5 changed files with 238 additions and 76 deletions

View File

@ -115,6 +115,7 @@ void FileCache::initialize()
}
is_initialized = true;
cleanup_task->activate();
cleanup_task->scheduleAfter(delayed_cleanup_interval_ms);
}
@ -833,43 +834,46 @@ void FileCache::loadMetadata()
}
size_t total_size = 0;
for (auto key_prefix_it = fs::directory_iterator{cache_base_path}; key_prefix_it != fs::directory_iterator(); ++key_prefix_it)
for (auto key_prefix_it = fs::directory_iterator{cache_base_path}; key_prefix_it != fs::directory_iterator();)
{
const fs::path key_prefix_directory = key_prefix_it->path();
key_prefix_it++;
if (!key_prefix_it->is_directory())
if (!fs::is_directory(key_prefix_directory))
{
if (key_prefix_directory.filename() != "status")
{
LOG_WARNING(
log, "Unexpected file {} (not a directory), will skip it",
key_prefix_it->path().string());
key_prefix_directory.string());
}
continue;
}
if (fs::is_empty(key_prefix_directory))
{
LOG_DEBUG(log, "Removing empty key prefix directory: {}", key_prefix_directory.string());
fs::remove(key_prefix_directory);
continue;
}
fs::directory_iterator key_it{};
for (; key_it != fs::directory_iterator(); ++key_it)
for (fs::directory_iterator key_it{key_prefix_directory}; key_it != fs::directory_iterator();)
{
const fs::path key_directory = key_it->path();
++key_it;
if (!key_it->is_directory())
if (!fs::is_directory(key_directory))
{
LOG_DEBUG(
log,
"Unexpected file: {} (not a directory). Expected a directory",
key_it->path().string());
key_directory.string());
continue;
}
if (fs::is_empty(key_directory))
{
LOG_DEBUG(log, "Removing empty key directory: {}", key_directory.string());
fs::remove(key_directory);
continue;
}
@ -932,7 +936,7 @@ void FileCache::loadMetadata()
log,
"Cache capacity changed (max size: {}, used: {}), "
"cached file `{}` does not fit in cache anymore (size: {})",
queue.getSizeLimit(), queue.getSize(), key_it->path().string(), size);
queue.getSizeLimit(), queue.getSize(), key_directory.string(), size);
fs::remove(offset_it->path());
}
@ -960,6 +964,8 @@ void FileCache::loadMetadata()
}
LockedKeyMetadataPtr FileCache::lockKeyMetadata(const Key & key, KeyNotFoundPolicy key_not_found_policy, bool is_initial_load)
{
KeyMetadataPtr key_metadata;
{
auto lock = metadata.lock();
@ -971,36 +977,54 @@ LockedKeyMetadataPtr FileCache::lockKeyMetadata(const Key & key, KeyNotFoundPoli
else if (key_not_found_policy == KeyNotFoundPolicy::RETURN_NULL)
return nullptr;
it = metadata.emplace(key, std::make_shared<KeyMetadata>(/* created_base_directory */is_initial_load)).first;
it = metadata.emplace(
key,
std::make_shared<KeyMetadata>(/* base_directory_already_exists */is_initial_load, metadata.getCleanupQueue())).first;
}
auto key_metadata = it->second;
key_metadata = it->second;
}
{
auto key_lock = key_metadata->lock();
if (key_metadata->inCleanupQueue(key_lock))
const auto cleanup_state = key_metadata->getCleanupState(key_lock);
if (cleanup_state == KeyMetadata::CleanupState::NOT_SUBMITTED)
{
/// No race is guaranteed because KeyGuard::Lock and CacheMetadataGuard::Lock are hold.
metadata.getCleanupQueue().remove(key);
return std::make_unique<LockedKeyMetadata>(key, key_metadata, std::move(key_lock), getPathInLocalCache(key));
}
if (key_not_found_policy == KeyNotFoundPolicy::THROW)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key `{}` in cache", key.toString());
else if (key_not_found_policy == KeyNotFoundPolicy::RETURN_NULL)
return nullptr;
if (cleanup_state == KeyMetadata::CleanupState::SUBMITTED_TO_CLEANUP_QUEUE)
{
key_metadata->removeFromCleanupQueue(key, key_lock);
return std::make_unique<LockedKeyMetadata>(key, key_metadata, std::move(key_lock), getPathInLocalCache(key));
}
return std::make_unique<LockedKeyMetadata>(
key, key_metadata, std::move(key_lock), getPathInLocalCache(key), metadata.getCleanupQueue());
chassert(cleanup_state == KeyMetadata::CleanupState::CLEANED_BY_CLEANUP_THREAD);
chassert(key_not_found_policy == KeyNotFoundPolicy::CREATE_EMPTY);
}
/// Not we are at a case:
/// cleanup_state == KeyMetadata::CleanupState::CLEANED_BY_CLEANUP_THREAD
/// and KeyNotFoundPolicy == CREATE_EMPTY
/// Retry.
return lockKeyMetadata(key, key_not_found_policy);
}
LockedKeyMetadataPtr FileCache::lockKeyMetadata(const Key & key, KeyMetadataPtr key_metadata) const
{
auto key_lock = key_metadata->lock();
if (key_metadata->inCleanupQueue(key_lock))
if (key_metadata->getCleanupState(key_lock) != KeyMetadata::CleanupState::NOT_SUBMITTED)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot lock key: it was removed from cache");
return std::make_unique<LockedKeyMetadata>(
key, key_metadata, std::move(key_lock), getPathInLocalCache(key), metadata.getCleanupQueue());
return std::make_unique<LockedKeyMetadata>(key, key_metadata, std::move(key_lock), getPathInLocalCache(key));
}
void FileCache::iterateCacheMetadata(const CacheMetadataGuard::Lock &, std::function<void(KeyMetadata &)> && func)
@ -1010,18 +1034,28 @@ void FileCache::iterateCacheMetadata(const CacheMetadataGuard::Lock &, std::func
{
auto key_lock = key_metadata->lock();
if (key_metadata->inCleanupQueue(key_lock))
if (key_metadata->getCleanupState(key_lock) != KeyMetadata::CleanupState::NOT_SUBMITTED)
continue;
func(*key_metadata);
}
}
FileCache::~FileCache()
{
cleanup_task->deactivate();
}
void FileCache::cleanup()
{
metadata.doCleanup();
}
void FileCache::cleanupThreadFunc()
{
try
{
metadata.doCleanup();
cleanup();
}
catch (...)
{

View File

@ -46,6 +46,8 @@ public:
FileCache(const String & cache_base_path_, const FileCacheSettings & cache_settings_);
~FileCache();
void initialize();
const String & getBasePath() const { return cache_base_path; }
@ -112,6 +114,8 @@ public:
LockedKeyMetadataPtr lockKeyMetadata(const Key & key, KeyMetadataPtr key_metadata) const;
void cleanup();
/// For per query cache limit.
struct QueryContextHolder : private boost::noncopyable
{

View File

@ -2,6 +2,7 @@
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Cache/LockedFileCachePriority.h>
#include <Common/logger_useful.h>
#include <filesystem>
namespace fs = std::filesystem;
@ -103,35 +104,27 @@ std::string KeyMetadata::toString() const
return result;
}
void CleanupQueue::add(const FileCacheKey & key)
void KeyMetadata::addToCleanupQueue(const FileCacheKey & key, const KeyGuard::Lock &)
{
std::lock_guard lock(mutex);
keys.insert(key);
cleanup_queue.add(key);
cleanup_state = CleanupState::SUBMITTED_TO_CLEANUP_QUEUE;
}
void CleanupQueue::remove(const FileCacheKey & key)
void KeyMetadata::removeFromCleanupQueue(const FileCacheKey & key, const KeyGuard::Lock &)
{
std::lock_guard lock(mutex);
bool erased = keys.erase(key);
if (!erased)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key to erase: {}", key.toString());
}
bool CleanupQueue::tryPop(FileCacheKey & key)
{
std::lock_guard lock(mutex);
if (keys.empty())
return false;
auto it = keys.begin();
key = *it;
keys.erase(it);
return true;
cleanup_queue.remove(key);
cleanup_state = CleanupState::NOT_SUBMITTED;
}
void CacheMetadata::doCleanup()
{
auto lock = guard.lock();
LOG_INFO(
&Poco::Logger::get("FileCacheCleanupThread"),
"Performing background cleanup (size: {})",
cleanup_queue.getSize());
/// Let's mention this case.
/// This metadata cleanup is delayed so what is we marked key as deleted and
/// put it to deletion queue, but then the same key was added to cache before
@ -146,6 +139,15 @@ void CacheMetadata::doCleanup()
if (it == end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key {} in metadata", cleanup_key.toString());
auto key_metadata = it->second;
auto key_lock = key_metadata->lock();
/// As in lockKeyMetadata we extract key metadata from cache metadata
/// under CacheMetadataGuard::Lock, but take KeyGuard::Lock only after we
/// released cache CacheMetadataGuard::Lock, then we must to take into
/// account it here.
if (key_metadata->getCleanupState(key_lock) == KeyMetadata::CleanupState::NOT_SUBMITTED)
continue;
erase(it);
try
@ -170,25 +172,21 @@ LockedKeyMetadata::LockedKeyMetadata(
const FileCacheKey & key_,
std::shared_ptr<KeyMetadata> key_metadata_,
KeyGuard::Lock && lock_,
const std::string & key_path_,
CleanupQueue & cleanup_keys_metadata_queue_)
const std::string & key_path_)
: key(key_)
, key_path(key_path_)
, key_metadata(key_metadata_)
, lock(std::move(lock_))
, cleanup_keys_metadata_queue(cleanup_keys_metadata_queue_)
, log(&Poco::Logger::get("LockedKeyMetadata"))
{
}
LockedKeyMetadata::~LockedKeyMetadata()
{
/// Someone might still need this directory.
if (!key_metadata->empty())
return;
cleanup_keys_metadata_queue.add(key);
key_metadata->in_cleanup_queue = true;
key_metadata->addToCleanupQueue(key, lock);
}
void LockedKeyMetadata::createKeyDirectoryIfNot()
@ -269,4 +267,40 @@ void LockedKeyMetadata::shrinkFileSegmentToDownloadedSize(
assert(file_segment_metadata->size() == entry.size);
}
void CleanupQueue::add(const FileCacheKey & key)
{
std::lock_guard lock(mutex);
auto [_, inserted] = keys.insert(key);
if (!inserted)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Key {} is already in removal queue", key.toString());
}
}
void CleanupQueue::remove(const FileCacheKey & key)
{
std::lock_guard lock(mutex);
bool erased = keys.erase(key);
if (!erased)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key {} in removal queue", key.toString());
}
bool CleanupQueue::tryPop(FileCacheKey & key)
{
std::lock_guard lock(mutex);
if (keys.empty())
return false;
auto it = keys.begin();
key = *it;
keys.erase(it);
return true;
}
size_t CleanupQueue::getSize() const
{
std::lock_guard lock(mutex);
return keys.size();
}
}

View File

@ -11,6 +11,7 @@ using FileSegmentPtr = std::shared_ptr<FileSegment>;
struct LockedKeyMetadata;
class LockedCachePriority;
struct KeysQueue;
struct CleanupQueue;
struct FileSegmentMetadata : private boost::noncopyable
@ -36,12 +37,12 @@ struct FileSegmentMetadata : private boost::noncopyable
: file_segment(std::move(other.file_segment)), queue_iterator(std::move(other.queue_iterator)) {}
};
struct KeyMetadata : public std::map<size_t, FileSegmentMetadata>, private boost::noncopyable
{
friend struct LockedKeyMetadata;
public:
explicit KeyMetadata(bool created_base_directory_) : created_base_directory(created_base_directory_) {}
explicit KeyMetadata(bool created_base_directory_, CleanupQueue & cleanup_queue_)
: created_base_directory(created_base_directory_), cleanup_queue(cleanup_queue_) {}
const FileSegmentMetadata * getByOffset(size_t offset) const;
FileSegmentMetadata * getByOffset(size_t offset);
@ -55,30 +56,41 @@ public:
bool createdBaseDirectory(const KeyGuard::Lock &) const { return created_base_directory; }
bool inCleanupQueue(const KeyGuard::Lock &) const { return in_cleanup_queue; }
enum class CleanupState
{
NOT_SUBMITTED,
SUBMITTED_TO_CLEANUP_QUEUE,
CLEANED_BY_CLEANUP_THREAD,
};
CleanupState getCleanupState(const KeyGuard::Lock &) const { return cleanup_state; }
void addToCleanupQueue(const FileCacheKey & key, const KeyGuard::Lock &);
void removeFromCleanupQueue(const FileCacheKey & key, const KeyGuard::Lock &);
private:
mutable KeyGuard guard;
bool created_base_directory = false;
bool in_cleanup_queue = false;
CleanupState cleanup_state = CleanupState::NOT_SUBMITTED;
CleanupQueue & cleanup_queue;
};
using KeyMetadataPtr = std::shared_ptr<KeyMetadata>;
struct CleanupQueue
{
friend struct CacheMetadata;
public:
void add(const FileCacheKey & key);
void remove(const FileCacheKey & key);
size_t getSize() const;
private:
bool tryPop(FileCacheKey & key);
std::unordered_set<FileCacheKey> keys;
std::mutex mutex;
mutable std::mutex mutex;
};
struct CacheMetadata : public std::unordered_map<FileCacheKey, KeyMetadataPtr>, private boost::noncopyable
@ -88,19 +100,21 @@ public:
CacheMetadataGuard::Lock lock() { return guard.lock(); }
CleanupQueue & getCleanupQueue() const { return cleanup_queue; }
void removeFromCleanupQueue(const FileCacheKey & key, const CacheMetadataGuard::Lock &) const;
void doCleanup();
CleanupQueue & getCleanupQueue() { return cleanup_queue; }
private:
void addToCleanupQueue(const FileCacheKey & key, const KeyGuard::Lock &);
void removeFromCleanupQueue(const FileCacheKey & key, const KeyGuard::Lock &);
const std::string base_directory;
CacheMetadataGuard guard;
mutable CleanupQueue cleanup_queue;
CleanupQueue cleanup_queue;
};
/**
* `LockedKeyMetadata` is an object which makes sure that as long as it exists the following is true:
* 1. the key cannot be removed from cache
@ -119,8 +133,7 @@ struct LockedKeyMetadata : private boost::noncopyable
const FileCacheKey & key_,
std::shared_ptr<KeyMetadata> key_metadata_,
KeyGuard::Lock && key_lock_,
const std::string & key_path_,
CleanupQueue & cleanup_keys_metadata_queue_);
const std::string & key_path_);
~LockedKeyMetadata();
@ -138,10 +151,9 @@ struct LockedKeyMetadata : private boost::noncopyable
private:
const FileCacheKey key;
const std::string & key_path;
const std::string key_path;
const std::shared_ptr<KeyMetadata> key_metadata;
KeyGuard::Lock lock; /// `lock` must be destructed before `key_metadata`.
CleanupQueue & cleanup_keys_metadata_queue;
Poco::Logger * log;
};

View File

@ -16,6 +16,9 @@
#include <filesystem>
#include <thread>
#include <DataTypes/DataTypesNumber.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/DOM/DOMParser.h>
#include <base/sleep.h>
#include <Poco/ConsoleChannel.h>
#include <Disks/IO/CachedOnDiskWriteBufferFromFile.h>
@ -63,6 +66,7 @@ using HolderPtr = FileSegmentsHolderPtr;
fs::path caches_dir = fs::current_path() / "lru_cache_test";
std::string cache_base_path = caches_dir / "cache1" / "";
void assertEqual(const HolderPtr & holder, const Ranges & expected_ranges, const States & expected_states = {})
{
std::cerr << "Holder: " << holder->toString() << "\n";
@ -131,7 +135,6 @@ void download(const HolderPtr & holder)
class FileCacheTest : public ::testing::Test
{
public:
static void setupLogs(const std::string & level)
{
Poco::AutoPtr<Poco::ConsoleChannel> channel(new Poco::ConsoleChannel(std::cerr));
@ -165,6 +168,14 @@ TEST_F(FileCacheTest, get)
/// To work with cache need query_id and query context.
std::string query_id = "query_id";
Poco::XML::DOMParser dom_parser;
std::string xml(R"CONFIG(<clickhouse>
</clickhouse>)CONFIG");
Poco::AutoPtr<Poco::XML::Document> document = dom_parser.parseString(xml);
Poco::AutoPtr<Poco::Util::XMLConfiguration> config = new Poco::Util::XMLConfiguration(document);
getMutableContext().context->setConfig(config);
auto query_context = DB::Context::createCopy(getContext().context);
query_context->makeQueryContext();
query_context->setCurrentQueryId(query_id);
@ -178,23 +189,36 @@ TEST_F(FileCacheTest, get)
std::cerr << "Step 1\n";
auto cache = FileCache(cache_base_path, settings);
std::cerr << "Step 1\n";
cache.initialize();
std::cerr << "Step 1\n";
auto key = cache.createKeyForPath("key1");
std::cerr << "Step 1\n";
{
std::cerr << "Step 1\n";
auto holder = cache.getOrSet(key, 0, 10, {}); /// Add range [0, 9]
std::cerr << "Step 1\n";
assertEqual(holder, { Range(0, 9) }, { State::EMPTY });
std::cerr << "Step 1\n";
download(holder->front());
std::cerr << "Step 1\n";
assertEqual(holder, { Range(0, 9) }, { State::DOWNLOADED });
std::cerr << "Step 1\n";
}
/// Current cache: [__________]
/// ^ ^
/// 0 9
std::cerr << "Step 1\n";
assertEqual(cache.getSnapshot(key), { Range(0, 9) });
std::cerr << "Step 1\n";
assertEqual(cache.dumpQueue(), { Range(0, 9) });
std::cerr << "Step 1\n";
ASSERT_EQ(cache.getFileSegmentsNum(), 1);
std::cerr << "Step 1\n";
ASSERT_EQ(cache.getUsedCacheSize(), 10);
std::cerr << "Step 1\n";
std::cerr << "Step 2\n";
@ -520,6 +544,60 @@ TEST_F(FileCacheTest, get)
{ State::EMPTY, State::EMPTY, State::EMPTY });
}
std::cerr << "Step 13\n";
{
/// Test delated cleanup
auto cache = FileCache(cache_base_path, settings);
cache.initialize();
cache.cleanup();
const auto key = cache.createKeyForPath("key10");
const auto key_path = cache.getPathInLocalCache(key);
cache.removeAllReleasable();
ASSERT_EQ(cache.getUsedCacheSize(), 0);
ASSERT_TRUE(!fs::exists(key_path));
ASSERT_TRUE(!fs::exists(fs::path(key_path).parent_path()));
download(cache.getOrSet(key, 0, 10, {}));
ASSERT_EQ(cache.getUsedCacheSize(), 10);
ASSERT_TRUE(fs::exists(cache.getPathInLocalCache(key, 0, FileSegmentKind::Regular)));
cache.removeAllReleasable();
ASSERT_EQ(cache.getUsedCacheSize(), 0);
ASSERT_TRUE(fs::exists(key_path));
ASSERT_TRUE(!fs::exists(cache.getPathInLocalCache(key, 0, FileSegmentKind::Regular)));
cache.cleanup();
ASSERT_TRUE(!fs::exists(key_path));
ASSERT_TRUE(!fs::exists(fs::path(key_path).parent_path()));
}
std::cerr << "Step 14\n";
{
/// Test background thread delated cleanup
auto settings2{settings};
settings2.delayed_cleanup_interval_ms = 0;
auto cache = FileCache(cache_base_path, settings2);
cache.initialize();
const auto key = cache.createKeyForPath("key10");
const auto key_path = cache.getPathInLocalCache(key);
cache.removeAllReleasable();
ASSERT_EQ(cache.getUsedCacheSize(), 0);
ASSERT_TRUE(!fs::exists(key_path));
ASSERT_TRUE(!fs::exists(fs::path(key_path).parent_path()));
download(cache.getOrSet(key, 0, 10, {}));
ASSERT_EQ(cache.getUsedCacheSize(), 10);
ASSERT_TRUE(fs::exists(key_path));
cache.removeAllReleasable();
ASSERT_EQ(cache.getUsedCacheSize(), 0);
sleepForSeconds(2);
ASSERT_TRUE(!fs::exists(key_path));
}
}
TEST_F(FileCacheTest, writeBuffer)