diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index 0db58f2ab37..8c8959724d3 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -115,6 +115,7 @@ void FileCache::initialize() } is_initialized = true; + cleanup_task->activate(); cleanup_task->scheduleAfter(delayed_cleanup_interval_ms); } @@ -833,43 +834,46 @@ void FileCache::loadMetadata() } size_t total_size = 0; - for (auto key_prefix_it = fs::directory_iterator{cache_base_path}; key_prefix_it != fs::directory_iterator(); ++key_prefix_it) + for (auto key_prefix_it = fs::directory_iterator{cache_base_path}; key_prefix_it != fs::directory_iterator();) { const fs::path key_prefix_directory = key_prefix_it->path(); + key_prefix_it++; - if (!key_prefix_it->is_directory()) + if (!fs::is_directory(key_prefix_directory)) { if (key_prefix_directory.filename() != "status") { LOG_WARNING( log, "Unexpected file {} (not a directory), will skip it", - key_prefix_it->path().string()); + key_prefix_directory.string()); } continue; } if (fs::is_empty(key_prefix_directory)) { + LOG_DEBUG(log, "Removing empty key prefix directory: {}", key_prefix_directory.string()); fs::remove(key_prefix_directory); continue; } - fs::directory_iterator key_it{}; - for (; key_it != fs::directory_iterator(); ++key_it) + for (fs::directory_iterator key_it{key_prefix_directory}; key_it != fs::directory_iterator();) { const fs::path key_directory = key_it->path(); + ++key_it; - if (!key_it->is_directory()) + if (!fs::is_directory(key_directory)) { LOG_DEBUG( log, "Unexpected file: {} (not a directory). Expected a directory", - key_it->path().string()); + key_directory.string()); continue; } if (fs::is_empty(key_directory)) { + LOG_DEBUG(log, "Removing empty key directory: {}", key_directory.string()); fs::remove(key_directory); continue; } @@ -932,7 +936,7 @@ void FileCache::loadMetadata() log, "Cache capacity changed (max size: {}, used: {}), " "cached file `{}` does not fit in cache anymore (size: {})", - queue.getSizeLimit(), queue.getSize(), key_it->path().string(), size); + queue.getSizeLimit(), queue.getSize(), key_directory.string(), size); fs::remove(offset_it->path()); } @@ -961,46 +965,66 @@ void FileCache::loadMetadata() LockedKeyMetadataPtr FileCache::lockKeyMetadata(const Key & key, KeyNotFoundPolicy key_not_found_policy, bool is_initial_load) { - auto lock = metadata.lock(); - - auto it = metadata.find(key); - if (it == metadata.end()) + KeyMetadataPtr key_metadata; { + auto lock = metadata.lock(); + + auto it = metadata.find(key); + if (it == metadata.end()) + { + if (key_not_found_policy == KeyNotFoundPolicy::THROW) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key `{}` in cache", key.toString()); + else if (key_not_found_policy == KeyNotFoundPolicy::RETURN_NULL) + return nullptr; + + it = metadata.emplace( + key, + std::make_shared(/* base_directory_already_exists */is_initial_load, metadata.getCleanupQueue())).first; + } + + key_metadata = it->second; + } + + { + auto key_lock = key_metadata->lock(); + + const auto cleanup_state = key_metadata->getCleanupState(key_lock); + + if (cleanup_state == KeyMetadata::CleanupState::NOT_SUBMITTED) + { + return std::make_unique(key, key_metadata, std::move(key_lock), getPathInLocalCache(key)); + } + if (key_not_found_policy == KeyNotFoundPolicy::THROW) throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key `{}` in cache", key.toString()); else if (key_not_found_policy == KeyNotFoundPolicy::RETURN_NULL) return nullptr; - it = metadata.emplace(key, std::make_shared(/* created_base_directory */is_initial_load)).first; + if (cleanup_state == KeyMetadata::CleanupState::SUBMITTED_TO_CLEANUP_QUEUE) + { + key_metadata->removeFromCleanupQueue(key, key_lock); + return std::make_unique(key, key_metadata, std::move(key_lock), getPathInLocalCache(key)); + } + + chassert(cleanup_state == KeyMetadata::CleanupState::CLEANED_BY_CLEANUP_THREAD); + chassert(key_not_found_policy == KeyNotFoundPolicy::CREATE_EMPTY); } - auto key_metadata = it->second; - auto key_lock = key_metadata->lock(); - - if (key_metadata->inCleanupQueue(key_lock)) - { - /// No race is guaranteed because KeyGuard::Lock and CacheMetadataGuard::Lock are hold. - metadata.getCleanupQueue().remove(key); - - if (key_not_found_policy == KeyNotFoundPolicy::THROW) - throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key `{}` in cache", key.toString()); - else if (key_not_found_policy == KeyNotFoundPolicy::RETURN_NULL) - return nullptr; - } - - return std::make_unique( - key, key_metadata, std::move(key_lock), getPathInLocalCache(key), metadata.getCleanupQueue()); + /// Not we are at a case: + /// cleanup_state == KeyMetadata::CleanupState::CLEANED_BY_CLEANUP_THREAD + /// and KeyNotFoundPolicy == CREATE_EMPTY + /// Retry. + return lockKeyMetadata(key, key_not_found_policy); } LockedKeyMetadataPtr FileCache::lockKeyMetadata(const Key & key, KeyMetadataPtr key_metadata) const { auto key_lock = key_metadata->lock(); - if (key_metadata->inCleanupQueue(key_lock)) + if (key_metadata->getCleanupState(key_lock) != KeyMetadata::CleanupState::NOT_SUBMITTED) throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot lock key: it was removed from cache"); - return std::make_unique( - key, key_metadata, std::move(key_lock), getPathInLocalCache(key), metadata.getCleanupQueue()); + return std::make_unique(key, key_metadata, std::move(key_lock), getPathInLocalCache(key)); } void FileCache::iterateCacheMetadata(const CacheMetadataGuard::Lock &, std::function && func) @@ -1010,18 +1034,28 @@ void FileCache::iterateCacheMetadata(const CacheMetadataGuard::Lock &, std::func { auto key_lock = key_metadata->lock(); - if (key_metadata->inCleanupQueue(key_lock)) + if (key_metadata->getCleanupState(key_lock) != KeyMetadata::CleanupState::NOT_SUBMITTED) continue; func(*key_metadata); } } +FileCache::~FileCache() +{ + cleanup_task->deactivate(); +} + +void FileCache::cleanup() +{ + metadata.doCleanup(); +} + void FileCache::cleanupThreadFunc() { try { - metadata.doCleanup(); + cleanup(); } catch (...) { diff --git a/src/Interpreters/Cache/FileCache.h b/src/Interpreters/Cache/FileCache.h index 2b94ac4fb27..f7a790f0c1a 100644 --- a/src/Interpreters/Cache/FileCache.h +++ b/src/Interpreters/Cache/FileCache.h @@ -46,6 +46,8 @@ public: FileCache(const String & cache_base_path_, const FileCacheSettings & cache_settings_); + ~FileCache(); + void initialize(); const String & getBasePath() const { return cache_base_path; } @@ -112,6 +114,8 @@ public: LockedKeyMetadataPtr lockKeyMetadata(const Key & key, KeyMetadataPtr key_metadata) const; + void cleanup(); + /// For per query cache limit. struct QueryContextHolder : private boost::noncopyable { diff --git a/src/Interpreters/Cache/Metadata.cpp b/src/Interpreters/Cache/Metadata.cpp index 01bab4f35c7..a68d4b3e5ec 100644 --- a/src/Interpreters/Cache/Metadata.cpp +++ b/src/Interpreters/Cache/Metadata.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include namespace fs = std::filesystem; @@ -103,35 +104,27 @@ std::string KeyMetadata::toString() const return result; } -void CleanupQueue::add(const FileCacheKey & key) +void KeyMetadata::addToCleanupQueue(const FileCacheKey & key, const KeyGuard::Lock &) { - std::lock_guard lock(mutex); - keys.insert(key); + cleanup_queue.add(key); + cleanup_state = CleanupState::SUBMITTED_TO_CLEANUP_QUEUE; } -void CleanupQueue::remove(const FileCacheKey & key) +void KeyMetadata::removeFromCleanupQueue(const FileCacheKey & key, const KeyGuard::Lock &) { - std::lock_guard lock(mutex); - bool erased = keys.erase(key); - if (!erased) - throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key to erase: {}", key.toString()); -} - -bool CleanupQueue::tryPop(FileCacheKey & key) -{ - std::lock_guard lock(mutex); - if (keys.empty()) - return false; - auto it = keys.begin(); - key = *it; - keys.erase(it); - return true; + cleanup_queue.remove(key); + cleanup_state = CleanupState::NOT_SUBMITTED; } void CacheMetadata::doCleanup() { auto lock = guard.lock(); + LOG_INFO( + &Poco::Logger::get("FileCacheCleanupThread"), + "Performing background cleanup (size: {})", + cleanup_queue.getSize()); + /// Let's mention this case. /// This metadata cleanup is delayed so what is we marked key as deleted and /// put it to deletion queue, but then the same key was added to cache before @@ -146,6 +139,15 @@ void CacheMetadata::doCleanup() if (it == end()) throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key {} in metadata", cleanup_key.toString()); + auto key_metadata = it->second; + auto key_lock = key_metadata->lock(); + /// As in lockKeyMetadata we extract key metadata from cache metadata + /// under CacheMetadataGuard::Lock, but take KeyGuard::Lock only after we + /// released cache CacheMetadataGuard::Lock, then we must to take into + /// account it here. + if (key_metadata->getCleanupState(key_lock) == KeyMetadata::CleanupState::NOT_SUBMITTED) + continue; + erase(it); try @@ -170,25 +172,21 @@ LockedKeyMetadata::LockedKeyMetadata( const FileCacheKey & key_, std::shared_ptr key_metadata_, KeyGuard::Lock && lock_, - const std::string & key_path_, - CleanupQueue & cleanup_keys_metadata_queue_) + const std::string & key_path_) : key(key_) , key_path(key_path_) , key_metadata(key_metadata_) , lock(std::move(lock_)) - , cleanup_keys_metadata_queue(cleanup_keys_metadata_queue_) , log(&Poco::Logger::get("LockedKeyMetadata")) { } LockedKeyMetadata::~LockedKeyMetadata() { - /// Someone might still need this directory. if (!key_metadata->empty()) return; - cleanup_keys_metadata_queue.add(key); - key_metadata->in_cleanup_queue = true; + key_metadata->addToCleanupQueue(key, lock); } void LockedKeyMetadata::createKeyDirectoryIfNot() @@ -269,4 +267,40 @@ void LockedKeyMetadata::shrinkFileSegmentToDownloadedSize( assert(file_segment_metadata->size() == entry.size); } +void CleanupQueue::add(const FileCacheKey & key) +{ + std::lock_guard lock(mutex); + auto [_, inserted] = keys.insert(key); + if (!inserted) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, "Key {} is already in removal queue", key.toString()); + } +} + +void CleanupQueue::remove(const FileCacheKey & key) +{ + std::lock_guard lock(mutex); + bool erased = keys.erase(key); + if (!erased) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key {} in removal queue", key.toString()); +} + +bool CleanupQueue::tryPop(FileCacheKey & key) +{ + std::lock_guard lock(mutex); + if (keys.empty()) + return false; + auto it = keys.begin(); + key = *it; + keys.erase(it); + return true; +} + +size_t CleanupQueue::getSize() const +{ + std::lock_guard lock(mutex); + return keys.size(); +} + } diff --git a/src/Interpreters/Cache/Metadata.h b/src/Interpreters/Cache/Metadata.h index d6485165d88..07a719e05ef 100644 --- a/src/Interpreters/Cache/Metadata.h +++ b/src/Interpreters/Cache/Metadata.h @@ -11,6 +11,7 @@ using FileSegmentPtr = std::shared_ptr; struct LockedKeyMetadata; class LockedCachePriority; struct KeysQueue; +struct CleanupQueue; struct FileSegmentMetadata : private boost::noncopyable @@ -36,12 +37,12 @@ struct FileSegmentMetadata : private boost::noncopyable : file_segment(std::move(other.file_segment)), queue_iterator(std::move(other.queue_iterator)) {} }; - struct KeyMetadata : public std::map, private boost::noncopyable { friend struct LockedKeyMetadata; public: - explicit KeyMetadata(bool created_base_directory_) : created_base_directory(created_base_directory_) {} + explicit KeyMetadata(bool created_base_directory_, CleanupQueue & cleanup_queue_) + : created_base_directory(created_base_directory_), cleanup_queue(cleanup_queue_) {} const FileSegmentMetadata * getByOffset(size_t offset) const; FileSegmentMetadata * getByOffset(size_t offset); @@ -55,30 +56,41 @@ public: bool createdBaseDirectory(const KeyGuard::Lock &) const { return created_base_directory; } - bool inCleanupQueue(const KeyGuard::Lock &) const { return in_cleanup_queue; } + enum class CleanupState + { + NOT_SUBMITTED, + SUBMITTED_TO_CLEANUP_QUEUE, + CLEANED_BY_CLEANUP_THREAD, + }; + + CleanupState getCleanupState(const KeyGuard::Lock &) const { return cleanup_state; } + + void addToCleanupQueue(const FileCacheKey & key, const KeyGuard::Lock &); + + void removeFromCleanupQueue(const FileCacheKey & key, const KeyGuard::Lock &); private: mutable KeyGuard guard; bool created_base_directory = false; - bool in_cleanup_queue = false; + CleanupState cleanup_state = CleanupState::NOT_SUBMITTED; + CleanupQueue & cleanup_queue; }; using KeyMetadataPtr = std::shared_ptr; - struct CleanupQueue { friend struct CacheMetadata; public: void add(const FileCacheKey & key); - void remove(const FileCacheKey & key); + size_t getSize() const; private: bool tryPop(FileCacheKey & key); std::unordered_set keys; - std::mutex mutex; + mutable std::mutex mutex; }; struct CacheMetadata : public std::unordered_map, private boost::noncopyable @@ -88,19 +100,21 @@ public: CacheMetadataGuard::Lock lock() { return guard.lock(); } - CleanupQueue & getCleanupQueue() const { return cleanup_queue; } - - void removeFromCleanupQueue(const FileCacheKey & key, const CacheMetadataGuard::Lock &) const; - void doCleanup(); + CleanupQueue & getCleanupQueue() { return cleanup_queue; } + private: + void addToCleanupQueue(const FileCacheKey & key, const KeyGuard::Lock &); + void removeFromCleanupQueue(const FileCacheKey & key, const KeyGuard::Lock &); + const std::string base_directory; CacheMetadataGuard guard; - mutable CleanupQueue cleanup_queue; + CleanupQueue cleanup_queue; }; + /** * `LockedKeyMetadata` is an object which makes sure that as long as it exists the following is true: * 1. the key cannot be removed from cache @@ -119,8 +133,7 @@ struct LockedKeyMetadata : private boost::noncopyable const FileCacheKey & key_, std::shared_ptr key_metadata_, KeyGuard::Lock && key_lock_, - const std::string & key_path_, - CleanupQueue & cleanup_keys_metadata_queue_); + const std::string & key_path_); ~LockedKeyMetadata(); @@ -138,10 +151,9 @@ struct LockedKeyMetadata : private boost::noncopyable private: const FileCacheKey key; - const std::string & key_path; + const std::string key_path; const std::shared_ptr key_metadata; KeyGuard::Lock lock; /// `lock` must be destructed before `key_metadata`. - CleanupQueue & cleanup_keys_metadata_queue; Poco::Logger * log; }; diff --git a/src/Interpreters/tests/gtest_lru_file_cache.cpp b/src/Interpreters/tests/gtest_lru_file_cache.cpp index bfd76d3a4bd..93df029a592 100644 --- a/src/Interpreters/tests/gtest_lru_file_cache.cpp +++ b/src/Interpreters/tests/gtest_lru_file_cache.cpp @@ -16,6 +16,9 @@ #include #include #include +#include +#include +#include #include #include @@ -63,6 +66,7 @@ using HolderPtr = FileSegmentsHolderPtr; fs::path caches_dir = fs::current_path() / "lru_cache_test"; std::string cache_base_path = caches_dir / "cache1" / ""; + void assertEqual(const HolderPtr & holder, const Ranges & expected_ranges, const States & expected_states = {}) { std::cerr << "Holder: " << holder->toString() << "\n"; @@ -131,7 +135,6 @@ void download(const HolderPtr & holder) class FileCacheTest : public ::testing::Test { public: - static void setupLogs(const std::string & level) { Poco::AutoPtr channel(new Poco::ConsoleChannel(std::cerr)); @@ -165,6 +168,14 @@ TEST_F(FileCacheTest, get) /// To work with cache need query_id and query context. std::string query_id = "query_id"; + + Poco::XML::DOMParser dom_parser; + std::string xml(R"CONFIG( +)CONFIG"); + Poco::AutoPtr document = dom_parser.parseString(xml); + Poco::AutoPtr config = new Poco::Util::XMLConfiguration(document); + getMutableContext().context->setConfig(config); + auto query_context = DB::Context::createCopy(getContext().context); query_context->makeQueryContext(); query_context->setCurrentQueryId(query_id); @@ -178,23 +189,36 @@ TEST_F(FileCacheTest, get) std::cerr << "Step 1\n"; auto cache = FileCache(cache_base_path, settings); + std::cerr << "Step 1\n"; cache.initialize(); + std::cerr << "Step 1\n"; auto key = cache.createKeyForPath("key1"); + std::cerr << "Step 1\n"; { + std::cerr << "Step 1\n"; auto holder = cache.getOrSet(key, 0, 10, {}); /// Add range [0, 9] + std::cerr << "Step 1\n"; assertEqual(holder, { Range(0, 9) }, { State::EMPTY }); + std::cerr << "Step 1\n"; download(holder->front()); + std::cerr << "Step 1\n"; assertEqual(holder, { Range(0, 9) }, { State::DOWNLOADED }); + std::cerr << "Step 1\n"; } /// Current cache: [__________] /// ^ ^ /// 0 9 + std::cerr << "Step 1\n"; assertEqual(cache.getSnapshot(key), { Range(0, 9) }); + std::cerr << "Step 1\n"; assertEqual(cache.dumpQueue(), { Range(0, 9) }); + std::cerr << "Step 1\n"; ASSERT_EQ(cache.getFileSegmentsNum(), 1); + std::cerr << "Step 1\n"; ASSERT_EQ(cache.getUsedCacheSize(), 10); + std::cerr << "Step 1\n"; std::cerr << "Step 2\n"; @@ -520,6 +544,60 @@ TEST_F(FileCacheTest, get) { State::EMPTY, State::EMPTY, State::EMPTY }); } + std::cerr << "Step 13\n"; + { + /// Test delated cleanup + + auto cache = FileCache(cache_base_path, settings); + cache.initialize(); + cache.cleanup(); + const auto key = cache.createKeyForPath("key10"); + const auto key_path = cache.getPathInLocalCache(key); + + cache.removeAllReleasable(); + ASSERT_EQ(cache.getUsedCacheSize(), 0); + ASSERT_TRUE(!fs::exists(key_path)); + ASSERT_TRUE(!fs::exists(fs::path(key_path).parent_path())); + + download(cache.getOrSet(key, 0, 10, {})); + ASSERT_EQ(cache.getUsedCacheSize(), 10); + ASSERT_TRUE(fs::exists(cache.getPathInLocalCache(key, 0, FileSegmentKind::Regular))); + + cache.removeAllReleasable(); + ASSERT_EQ(cache.getUsedCacheSize(), 0); + ASSERT_TRUE(fs::exists(key_path)); + ASSERT_TRUE(!fs::exists(cache.getPathInLocalCache(key, 0, FileSegmentKind::Regular))); + + cache.cleanup(); + ASSERT_TRUE(!fs::exists(key_path)); + ASSERT_TRUE(!fs::exists(fs::path(key_path).parent_path())); + } + + std::cerr << "Step 14\n"; + { + /// Test background thread delated cleanup + + auto settings2{settings}; + settings2.delayed_cleanup_interval_ms = 0; + auto cache = FileCache(cache_base_path, settings2); + cache.initialize(); + const auto key = cache.createKeyForPath("key10"); + const auto key_path = cache.getPathInLocalCache(key); + + cache.removeAllReleasable(); + ASSERT_EQ(cache.getUsedCacheSize(), 0); + ASSERT_TRUE(!fs::exists(key_path)); + ASSERT_TRUE(!fs::exists(fs::path(key_path).parent_path())); + + download(cache.getOrSet(key, 0, 10, {})); + ASSERT_EQ(cache.getUsedCacheSize(), 10); + ASSERT_TRUE(fs::exists(key_path)); + + cache.removeAllReleasable(); + ASSERT_EQ(cache.getUsedCacheSize(), 0); + sleepForSeconds(2); + ASSERT_TRUE(!fs::exists(key_path)); + } } TEST_F(FileCacheTest, writeBuffer)