diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 970919d1514..b3dee29e02f 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -194,6 +194,7 @@ M(FilesystemCacheSizeLimit, "Filesystem cache size limit in bytes") \ M(FilesystemCacheElements, "Filesystem cache elements (file segments)") \ M(FilesystemCacheDownloadQueueElements, "Filesystem cache elements in download queue") \ + M(FilesystemCacheDelayedCleanupElements, "Filesystem cache elements in background cleanup queue") \ M(AsyncInsertCacheSize, "Number of async insert hash id in cache") \ M(S3Requests, "S3 requests") \ M(KeeperAliveConnections, "Number of alive connections") \ diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index b7b36cebf83..2a445cffff9 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -10,6 +10,7 @@ #include #include #include +#include "Common/ThreadPool_fwd.h" #include #include #include @@ -54,7 +55,6 @@ namespace ErrorCodes FileCache::FileCache(const FileCacheSettings & settings) : max_file_segment_size(settings.max_file_segment_size) , bypass_cache_threshold(settings.enable_bypass_cache_with_threashold ? settings.bypass_cache_threashold : 0) - , delayed_cleanup_interval_ms(settings.delayed_cleanup_interval_ms) , boundary_alignment(settings.boundary_alignment) , background_download_threads(settings.background_download_threads) , log(&Poco::Logger::get("FileCache")) @@ -134,9 +134,7 @@ void FileCache::initialize() for (size_t i = 0; i < background_download_threads; ++i) download_threads.emplace_back([this] { metadata.downloadThreadFunc(); }); - cleanup_task = Context::getGlobalContextInstance()->getSchedulePool().createTask("FileCacheCleanup", [this]{ cleanupThreadFunc(); }); - cleanup_task->activate(); - cleanup_task->scheduleAfter(delayed_cleanup_interval_ms); + cleanup_thread = std::make_unique(std::function{ [this]{ metadata.cleanupThreadFunc(); }}); } CacheGuard::Lock FileCache::lockCache() const @@ -1028,33 +1026,14 @@ FileCache::~FileCache() void FileCache::deactivateBackgroundOperations() { - if (cleanup_task) - cleanup_task->deactivate(); - metadata.cancelDownload(); for (auto & thread : download_threads) if (thread.joinable()) thread.join(); -} -void FileCache::cleanup() -{ - metadata.doCleanup(); -} - -void FileCache::cleanupThreadFunc() -{ - try - { - cleanup(); - } - catch (...) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - chassert(false); - } - - cleanup_task->scheduleAfter(delayed_cleanup_interval_ms); + metadata.cancelCleanup(); + if (cleanup_thread && cleanup_thread->joinable()) + cleanup_thread->join(); } FileSegmentsHolderPtr FileCache::getSnapshot() diff --git a/src/Interpreters/Cache/FileCache.h b/src/Interpreters/Cache/FileCache.h index d020f6d35f7..de0923bdbd1 100644 --- a/src/Interpreters/Cache/FileCache.h +++ b/src/Interpreters/Cache/FileCache.h @@ -12,7 +12,7 @@ #include -#include +#include #include #include #include @@ -130,8 +130,6 @@ public: FileSegmentsHolderPtr dumpQueue(); - void cleanup(); - void deactivateBackgroundOperations(); /// For per query cache limit. @@ -157,7 +155,6 @@ private: const size_t max_file_segment_size; const size_t bypass_cache_threshold = 0; - const size_t delayed_cleanup_interval_ms; const size_t boundary_alignment; const size_t background_download_threads; @@ -202,9 +199,8 @@ private: * A background cleanup task. * Clears removed cache entries from metadata. */ - BackgroundSchedulePool::TaskHolder cleanup_task; - std::vector download_threads; + std::unique_ptr cleanup_thread; void assertInitialized() const; @@ -235,8 +231,6 @@ private: FileSegment::State state, const CreateFileSegmentSettings & create_settings, const CacheGuard::Lock *); - - void cleanupThreadFunc(); }; } diff --git a/src/Interpreters/Cache/FileCacheSettings.cpp b/src/Interpreters/Cache/FileCacheSettings.cpp index 455e9b44d0b..426bbbc6ee9 100644 --- a/src/Interpreters/Cache/FileCacheSettings.cpp +++ b/src/Interpreters/Cache/FileCacheSettings.cpp @@ -49,8 +49,6 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration & if (config.has(config_prefix + ".background_download_threads")) background_download_threads = config.getUInt(config_prefix + ".background_download_threads"); - - delayed_cleanup_interval_ms = config.getUInt64(config_prefix + ".delayed_cleanup_interval_ms", FILECACHE_DELAYED_CLEANUP_INTERVAL_MS); } } diff --git a/src/Interpreters/Cache/FileCacheSettings.h b/src/Interpreters/Cache/FileCacheSettings.h index e56d6fcc54d..fab3e8744f6 100644 --- a/src/Interpreters/Cache/FileCacheSettings.h +++ b/src/Interpreters/Cache/FileCacheSettings.h @@ -24,7 +24,6 @@ struct FileCacheSettings bool enable_bypass_cache_with_threashold = false; size_t bypass_cache_threashold = FILECACHE_BYPASS_THRESHOLD; - size_t delayed_cleanup_interval_ms = FILECACHE_DELAYED_CLEANUP_INTERVAL_MS; size_t boundary_alignment = FILECACHE_DEFAULT_FILE_SEGMENT_ALIGNMENT; size_t background_download_threads = FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_THREADS; diff --git a/src/Interpreters/Cache/FileCache_fwd.h b/src/Interpreters/Cache/FileCache_fwd.h index 6cb2a53684d..939cb676e3a 100644 --- a/src/Interpreters/Cache/FileCache_fwd.h +++ b/src/Interpreters/Cache/FileCache_fwd.h @@ -10,8 +10,6 @@ static constexpr int FILECACHE_DEFAULT_BACKGROUND_DOWNLOAD_THREADS = 2; static constexpr int FILECACHE_DEFAULT_MAX_ELEMENTS = 10000000; static constexpr int FILECACHE_DEFAULT_HITS_THRESHOLD = 0; static constexpr size_t FILECACHE_BYPASS_THRESHOLD = 256 * 1024 * 1024; -static constexpr size_t FILECACHE_DELAYED_CLEANUP_INTERVAL_MS = 1000 * 60; /// 1 min -static constexpr size_t FILECACHE_DELAYED_CLEANUP_BATCH_SIZE = 1000; class FileCache; using FileCachePtr = std::shared_ptr; diff --git a/src/Interpreters/Cache/Metadata.cpp b/src/Interpreters/Cache/Metadata.cpp index 13a158ce35d..8fcb59c0259 100644 --- a/src/Interpreters/Cache/Metadata.cpp +++ b/src/Interpreters/Cache/Metadata.cpp @@ -10,6 +10,7 @@ namespace fs = std::filesystem; namespace CurrentMetrics { extern const Metric FilesystemCacheDownloadQueueElements; + extern const Metric FilesystemCacheDelayedCleanupElements; } namespace ProfileEvents @@ -134,22 +135,6 @@ std::string KeyMetadata::getFileSegmentPath(const FileSegment & file_segment) } -class CleanupQueue -{ - friend struct CacheMetadata; -public: - void add(const FileCacheKey & key); - void remove(const FileCacheKey & key); - size_t getSize() const; - -private: - bool tryPop(FileCacheKey & key); - - std::unordered_set keys; - mutable std::mutex mutex; -}; - - CacheMetadata::CacheMetadata(const std::string & path_) : path(path_) , cleanup_queue(std::make_unique()) @@ -269,37 +254,6 @@ void CacheMetadata::iterate(IterateFunc && func) } } -void CacheMetadata::doCleanup() -{ - /// Firstly, this cleanup does not delete cache files, - /// but only empty keys from cache_metadata_map and key (prefix) directories from fs. - /// Secondly, it deletes those only if arised as a result of - /// (1) eviction in FileCache::tryReserve(); - /// (2) removal of cancelled non-downloaded file segments after FileSegment::complete(). - /// which does not include removal of cache files because of FileCache::removeKey/removeAllKeys, - /// triggered by removal of source files from objects storage. - /// E.g. number of elements submitted to background cleanup should remain low. - - auto lock = lockMetadata(); - LOG_DEBUG(log, "Having {} keys to delete", cleanup_queue->getSize()); - - FileCacheKey cleanup_key; - size_t remaining_remove_num = FILECACHE_DELAYED_CLEANUP_BATCH_SIZE; - while (remaining_remove_num && cleanup_queue->tryPop(cleanup_key)) - { - auto it = find(cleanup_key); - if (it == end()) - continue; - - auto locked_key = it->second->lockNoStateCheck(); - if (locked_key->getKeyState() == KeyMetadata::KeyState::REMOVING) - { - removeKeyImpl(it, *locked_key, lock); - --remaining_remove_num; - } - } -} - void CacheMetadata::removeAllKeys(bool if_releasable) { auto lock = lockMetadata(); @@ -387,6 +341,86 @@ CacheMetadata::iterator CacheMetadata::removeKeyImpl(iterator it, LockedKey & lo return next_it; } +class CleanupQueue +{ + friend struct CacheMetadata; +public: + void add(const FileCacheKey & key) + { + { + std::lock_guard lock(mutex); + keys.insert(key); + } + CurrentMetrics::add(CurrentMetrics::FilesystemCacheDelayedCleanupElements); + cv.notify_one(); + } + + void cancel() + { + { + std::lock_guard lock(mutex); + cancelled = true; + } + cv.notify_all(); + } + +private: + std::unordered_set keys; + mutable std::mutex mutex; + std::condition_variable cv; + bool cancelled = false; +}; + +void CacheMetadata::cleanupThreadFunc() +{ + while (true) + { + Key key; + { + std::unique_lock lock(cleanup_queue->mutex); + + if (cleanup_queue->cancelled) + return; + + auto & keys = cleanup_queue->keys; + if (keys.empty()) + { + cleanup_queue->cv.wait(lock); + continue; + } + + auto it = keys.begin(); + key = *it; + keys.erase(it); + } + + CurrentMetrics::sub(CurrentMetrics::FilesystemCacheDelayedCleanupElements); + + try + { + auto lock = lockMetadata(); + + auto it = find(key); + if (it == end()) + continue; + + auto locked_key = it->second->lockNoStateCheck(); + if (locked_key->getKeyState() == KeyMetadata::KeyState::REMOVING) + { + removeKeyImpl(it, *locked_key, lock); + } + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } +} + +void CacheMetadata::cancelCleanup() +{ + cleanup_queue->cancel(); +} class DownloadQueue { @@ -814,35 +848,4 @@ std::string LockedKey::toString() const return result; } -void CleanupQueue::add(const FileCacheKey & key) -{ - std::lock_guard lock(mutex); - keys.insert(key); -} - -void CleanupQueue::remove(const FileCacheKey & key) -{ - std::lock_guard lock(mutex); - bool erased = keys.erase(key); - if (!erased) - throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key {} in removal queue", key); -} - -bool CleanupQueue::tryPop(FileCacheKey & key) -{ - std::lock_guard lock(mutex); - if (keys.empty()) - return false; - auto it = keys.begin(); - key = *it; - keys.erase(it); - return true; -} - -size_t CleanupQueue::getSize() const -{ - std::lock_guard lock(mutex); - return keys.size(); -} - } diff --git a/src/Interpreters/Cache/Metadata.h b/src/Interpreters/Cache/Metadata.h index 9e5fc20d9e2..caf6563aa9d 100644 --- a/src/Interpreters/Cache/Metadata.h +++ b/src/Interpreters/Cache/Metadata.h @@ -124,7 +124,17 @@ public: void removeKey(const Key & key, bool if_exists, bool is_releasable); void removeAllKeys(bool is_releasable); - void doCleanup(); + void cancelCleanup(); + + /// Firstly, this cleanup does not delete cache files, + /// but only empty keys from cache_metadata_map and key (prefix) directories from fs. + /// Secondly, it deletes those only if arised as a result of + /// (1) eviction in FileCache::tryReserve(); + /// (2) removal of cancelled non-downloaded file segments after FileSegment::complete(). + /// which does not include removal of cache files because of FileCache::removeKey/removeAllKeys, + /// triggered by removal of source files from objects storage. + /// E.g. number of elements submitted to background cleanup should remain low. + void cleanupThreadFunc(); void downloadThreadFunc(); diff --git a/src/Interpreters/InterpreterDescribeCacheQuery.cpp b/src/Interpreters/InterpreterDescribeCacheQuery.cpp index 7822ecdb8be..f9c61afb4d8 100644 --- a/src/Interpreters/InterpreterDescribeCacheQuery.cpp +++ b/src/Interpreters/InterpreterDescribeCacheQuery.cpp @@ -25,7 +25,6 @@ static Block getSampleBlock() ColumnWithTypeAndName{std::make_shared(), "current_size"}, ColumnWithTypeAndName{std::make_shared(), "current_elements"}, ColumnWithTypeAndName{std::make_shared(), "path"}, - ColumnWithTypeAndName{std::make_shared>(), "delayed_cleanup_interval_ms"}, ColumnWithTypeAndName{std::make_shared>(), "background_download_threads"}, ColumnWithTypeAndName{std::make_shared>(), "enable_bypass_cache_with_threshold"}, }; @@ -54,7 +53,6 @@ BlockIO InterpreterDescribeCacheQuery::execute() res_columns[i++]->insert(cache->getUsedCacheSize()); res_columns[i++]->insert(cache->getFileSegmentsNum()); res_columns[i++]->insert(cache->getBasePath()); - res_columns[i++]->insert(settings.delayed_cleanup_interval_ms); res_columns[i++]->insert(settings.background_download_threads); res_columns[i++]->insert(settings.enable_bypass_cache_with_threashold); diff --git a/src/Interpreters/tests/gtest_lru_file_cache.cpp b/src/Interpreters/tests/gtest_lru_file_cache.cpp index dab14a66ed7..cbaf3733464 100644 --- a/src/Interpreters/tests/gtest_lru_file_cache.cpp +++ b/src/Interpreters/tests/gtest_lru_file_cache.cpp @@ -604,7 +604,6 @@ TEST_F(FileCacheTest, get) auto cache = FileCache(settings); cache.initialize(); - cache.cleanup(); const auto key = cache.createKeyForPath("key10"); const auto key_path = cache.getPathInLocalCache(key); @@ -622,7 +621,6 @@ TEST_F(FileCacheTest, get) ASSERT_TRUE(fs::exists(key_path)); ASSERT_TRUE(!fs::exists(cache.getPathInLocalCache(key, 0, FileSegmentKind::Regular))); - cache.cleanup(); ASSERT_TRUE(!fs::exists(key_path)); ASSERT_TRUE(!fs::exists(fs::path(key_path).parent_path())); } @@ -632,7 +630,6 @@ TEST_F(FileCacheTest, get) /// Test background thread delated cleanup auto settings2{settings}; - settings2.delayed_cleanup_interval_ms = 0; auto cache = DB::FileCache(settings2); cache.initialize(); const auto key = cache.createKeyForPath("key10");