From e64166411580973d371ed5b3df3da7af4743cefa Mon Sep 17 00:00:00 2001 From: kssenii Date: Fri, 19 Aug 2022 18:15:18 +0200 Subject: [PATCH] Refactor --- src/Common/FileCache.cpp | 262 +++++++++++++++++++-------------------- src/Common/FileCache.h | 242 +++++++++++++++++++----------------- 2 files changed, 253 insertions(+), 251 deletions(-) diff --git a/src/Common/FileCache.cpp b/src/Common/FileCache.cpp index 0f2c4559177..1aa8a25bb79 100644 --- a/src/Common/FileCache.cpp +++ b/src/Common/FileCache.cpp @@ -30,12 +30,12 @@ FileCache::FileCache( , max_element_size(cache_settings_.max_elements) , max_file_segment_size(cache_settings_.max_file_segment_size) , allow_persistent_files(cache_settings_.do_not_evict_index_and_mark_files) + , enable_cache_hits_threshold(cache_settings_.enable_cache_hits_threshold) , enable_filesystem_query_cache_limit(cache_settings_.enable_filesystem_query_cache_limit) + , log(&Poco::Logger::get("FileCache")) , main_priority(std::make_unique()) , stash_priority(std::make_unique()) , max_stash_element_size(cache_settings_.max_elements) - , enable_cache_hits_threshold(cache_settings_.enable_cache_hits_threshold) - , log(&Poco::Logger::get("FileCache")) { } @@ -77,132 +77,6 @@ void FileCache::assertInitialized() const throw Exception(ErrorCodes::REMOTE_FS_OBJECT_CACHE_ERROR, "Cache not initialized"); } -FileCache::QueryContextPtr FileCache::getCurrentQueryContext(std::lock_guard & cache_lock) -{ - if (!isQueryInitialized()) - return nullptr; - - return getQueryContext(std::string(CurrentThread::getQueryId()), cache_lock); -} - -FileCache::QueryContextPtr FileCache::getQueryContext(const String & query_id, std::lock_guard & /* cache_lock */) -{ - auto query_iter = query_map.find(query_id); - return (query_iter == query_map.end()) ? nullptr : query_iter->second; -} - -void FileCache::removeQueryContext(const String & query_id) -{ - std::lock_guard cache_lock(mutex); - auto query_iter = query_map.find(query_id); - - if (query_iter == query_map.end()) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Attempt to release query context that does not exist (query_id: {})", - query_id); - } - - query_map.erase(query_iter); -} - -FileCache::QueryContextPtr FileCache::getOrSetQueryContext( - const String & query_id, const ReadSettings & settings, std::lock_guard & cache_lock) -{ - if (query_id.empty()) - return nullptr; - - auto context = getQueryContext(query_id, cache_lock); - if (context) - return context; - - auto query_context = std::make_shared(settings.max_query_cache_size, settings.skip_download_if_exceeds_query_cache); - auto query_iter = query_map.emplace(query_id, query_context).first; - return query_iter->second; -} - -FileCache::QueryContextHolder FileCache::getQueryContextHolder(const String & query_id, const ReadSettings & settings) -{ - std::lock_guard cache_lock(mutex); - - if (!enable_filesystem_query_cache_limit || settings.max_query_cache_size == 0) - return {}; - - /// if enable_filesystem_query_cache_limit is true, and max_query_cache_size large than zero, - /// we create context query for current query. - auto context = getOrSetQueryContext(query_id, settings, cache_lock); - return QueryContextHolder(query_id, this, context); -} - -void FileCache::QueryContext::remove(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock) -{ - if (cache_size < size) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Deleted cache size exceeds existing cache size"); - - if (!skip_download_if_exceeds_query_cache) - { - auto record = records.find({key, offset}); - if (record != records.end()) - { - record->second->removeAndGetNext(cache_lock); - records.erase({key, offset}); - } - } - cache_size -= size; -} - -void FileCache::QueryContext::reserve(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock) -{ - if (cache_size + size > max_cache_size) - { - throw Exception( - ErrorCodes::LOGICAL_ERROR, - "Reserved cache size exceeds the remaining cache size (key: {}, offset: {})", - key.toString(), offset); - } - - if (!skip_download_if_exceeds_query_cache) - { - auto record = records.find({key, offset}); - if (record == records.end()) - { - auto queue_iter = priority->add(key, offset, 0, cache_lock); - record = records.insert({{key, offset}, queue_iter}).first; - } - record->second->incrementSize(size, cache_lock); - } - cache_size += size; -} - -void FileCache::QueryContext::use(const Key & key, size_t offset, std::lock_guard & cache_lock) -{ - if (skip_download_if_exceeds_query_cache) - return; - - auto record = records.find({key, offset}); - if (record != records.end()) - record->second->use(cache_lock); -} - -FileCache::QueryContextHolder::QueryContextHolder( - const String & query_id_, - FileCache * cache_, - FileCache::QueryContextPtr context_) - : query_id(query_id_) - , cache(cache_) - , context(context_) -{ -} - -FileCache::QueryContextHolder::~QueryContextHolder() -{ - /// If only the query_map and the current holder hold the context_query, - /// the query has been completed and the query_context is released. - if (context && context.use_count() == 2) - cache->removeQueryContext(query_id); -} - void FileCache::initialize() { std::lock_guard cache_lock(mutex); @@ -1222,12 +1096,6 @@ size_t FileCache::getUsedCacheSizeUnlocked(std::lock_guard & cache_l return main_priority->getCacheSize(cache_lock); } -size_t FileCache::getAvailableCacheSize() const -{ - std::lock_guard cache_lock(mutex); - return getAvailableCacheSizeUnlocked(cache_lock); -} - size_t FileCache::getAvailableCacheSizeUnlocked(std::lock_guard & cache_lock) const { return max_size - getUsedCacheSizeUnlocked(cache_lock); @@ -1346,4 +1214,130 @@ void FileCache::assertPriorityCorrectness(std::lock_guard & cache_lo assert(main_priority->getElementsNum(cache_lock) <= max_element_size); } +FileCache::QueryContextPtr FileCache::getCurrentQueryContext(std::lock_guard & cache_lock) +{ + if (!isQueryInitialized()) + return nullptr; + + return getQueryContext(std::string(CurrentThread::getQueryId()), cache_lock); +} + +FileCache::QueryContextPtr FileCache::getQueryContext(const String & query_id, std::lock_guard & /* cache_lock */) +{ + auto query_iter = query_map.find(query_id); + return (query_iter == query_map.end()) ? nullptr : query_iter->second; +} + +void FileCache::removeQueryContext(const String & query_id) +{ + std::lock_guard cache_lock(mutex); + auto query_iter = query_map.find(query_id); + + if (query_iter == query_map.end()) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Attempt to release query context that does not exist (query_id: {})", + query_id); + } + + query_map.erase(query_iter); +} + +FileCache::QueryContextPtr FileCache::getOrSetQueryContext( + const String & query_id, const ReadSettings & settings, std::lock_guard & cache_lock) +{ + if (query_id.empty()) + return nullptr; + + auto context = getQueryContext(query_id, cache_lock); + if (context) + return context; + + auto query_context = std::make_shared(settings.max_query_cache_size, settings.skip_download_if_exceeds_query_cache); + auto query_iter = query_map.emplace(query_id, query_context).first; + return query_iter->second; +} + +FileCache::QueryContextHolder FileCache::getQueryContextHolder(const String & query_id, const ReadSettings & settings) +{ + std::lock_guard cache_lock(mutex); + + if (!enable_filesystem_query_cache_limit || settings.max_query_cache_size == 0) + return {}; + + /// if enable_filesystem_query_cache_limit is true, and max_query_cache_size large than zero, + /// we create context query for current query. + auto context = getOrSetQueryContext(query_id, settings, cache_lock); + return QueryContextHolder(query_id, this, context); +} + +void FileCache::QueryContext::remove(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock) +{ + if (cache_size < size) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Deleted cache size exceeds existing cache size"); + + if (!skip_download_if_exceeds_query_cache) + { + auto record = records.find({key, offset}); + if (record != records.end()) + { + record->second->removeAndGetNext(cache_lock); + records.erase({key, offset}); + } + } + cache_size -= size; +} + +void FileCache::QueryContext::reserve(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock) +{ + if (cache_size + size > max_cache_size) + { + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "Reserved cache size exceeds the remaining cache size (key: {}, offset: {})", + key.toString(), offset); + } + + if (!skip_download_if_exceeds_query_cache) + { + auto record = records.find({key, offset}); + if (record == records.end()) + { + auto queue_iter = priority->add(key, offset, 0, cache_lock); + record = records.insert({{key, offset}, queue_iter}).first; + } + record->second->incrementSize(size, cache_lock); + } + cache_size += size; +} + +void FileCache::QueryContext::use(const Key & key, size_t offset, std::lock_guard & cache_lock) +{ + if (skip_download_if_exceeds_query_cache) + return; + + auto record = records.find({key, offset}); + if (record != records.end()) + record->second->use(cache_lock); +} + +FileCache::QueryContextHolder::QueryContextHolder( + const String & query_id_, + FileCache * cache_, + FileCache::QueryContextPtr context_) + : query_id(query_id_) + , cache(cache_) + , context(context_) +{ +} + +FileCache::QueryContextHolder::~QueryContextHolder() +{ + /// If only the query_map and the current holder hold the context_query, + /// the query has been completed and the query_context is released. + if (context && context.use_count() == 2) + cache->removeQueryContext(query_id); +} + } diff --git a/src/Common/FileCache.h b/src/Common/FileCache.h index 5b368329c88..1690690d102 100644 --- a/src/Common/FileCache.h +++ b/src/Common/FileCache.h @@ -23,13 +23,17 @@ namespace DB { /// Local cache for remote filesystem files, represented as a set of non-overlapping non-empty file segments. -/// Different caching algorithms are implemented based on IFileCachePriority. +/// Different caching algorithms are implemented using IFileCachePriority. class FileCache : private boost::noncopyable { - friend class FileSegment; - friend class IFileCachePriority; - friend struct FileSegmentsHolder; - friend class FileSegmentRangeWriter; + +friend class FileSegment; +friend class IFileCachePriority; +friend struct FileSegmentsHolder; +friend class FileSegmentRangeWriter; + +struct QueryContext; +using QueryContextPtr = std::shared_ptr; public: using Key = DB::FileCacheKey; @@ -41,25 +45,8 @@ public: /// Restore cache from local filesystem. void initialize(); - void removeIfExists(const Key & key); - - void removeIfReleasable(); - - static bool isReadOnly(); - - /// Cache capacity in bytes. - size_t capacity() const { return max_size; } - - static Key hash(const String & path); - - String getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const; - - String getPathInLocalCache(const Key & key) const; - const String & getBasePath() const { return cache_base_path; } - std::vector tryGetCachePaths(const Key & key); - /** * Given an `offset` and `size` representing [offset, offset + size) bytes interval, * return list of cached non-overlapping non-empty @@ -84,6 +71,28 @@ public: */ FileSegmentsHolder get(const Key & key, size_t offset, size_t size); + /// Remove files by `key`. Removes files which might be used at the moment. + void removeIfExists(const Key & key); + + /// Remove files by `key`. Will not remove files which are used at the moment. + void removeIfReleasable(); + + static Key hash(const String & path); + + String getPathInLocalCache(const Key & key, size_t offset, bool is_persistent) const; + + String getPathInLocalCache(const Key & key) const; + + std::vector tryGetCachePaths(const Key & key); + + size_t capacity() const { return max_size; } + + size_t getUsedCacheSize() const; + + size_t getFileSegmentsNum() const; + + static bool isReadOnly(); + /** * Create a file segment of exactly requested size with EMPTY state. * Throw exception if requested size exceeds max allowed file segment size. @@ -102,92 +111,6 @@ public: /// For debug. String dumpStructure(const Key & key); - size_t getUsedCacheSize() const; - - size_t getFileSegmentsNum() const; - -private: - String cache_base_path; - size_t max_size; - size_t max_element_size; - size_t max_file_segment_size; - bool allow_persistent_files; - - bool is_initialized = false; - - mutable std::mutex mutex; - - bool tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock); - - void remove(Key key, size_t offset, std::lock_guard & cache_lock, std::lock_guard & segment_lock); - - bool isLastFileSegmentHolder( - const Key & key, size_t offset, std::lock_guard & cache_lock, std::lock_guard & segment_lock); - - void reduceSizeToDownloaded( - const Key & key, size_t offset, std::lock_guard & cache_lock, std::lock_guard & /* segment_lock */); - - void assertInitialized() const; - - using AccessKeyAndOffset = std::pair; - struct KeyAndOffsetHash - { - std::size_t operator()(const AccessKeyAndOffset & key) const - { - return std::hash()(key.first.key) ^ std::hash()(key.second); - } - }; - - using FileCacheRecords = std::unordered_map; - - /// Used to track and control the cache access of each query. - /// Through it, we can realize the processing of different queries by the cache layer. - struct QueryContext - { - FileCacheRecords records; - FileCachePriorityPtr priority; - - size_t cache_size = 0; - size_t max_cache_size; - - bool skip_download_if_exceeds_query_cache; - - QueryContext(size_t max_cache_size_, bool skip_download_if_exceeds_query_cache_) - : max_cache_size(max_cache_size_), skip_download_if_exceeds_query_cache(skip_download_if_exceeds_query_cache_) - { - } - - void remove(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock); - - void reserve(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock); - - void use(const Key & key, size_t offset, std::lock_guard & cache_lock); - - size_t getMaxCacheSize() const { return max_cache_size; } - - size_t getCacheSize() const { return cache_size; } - - FileCachePriorityPtr getPriority() { return priority; } - - bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; } - }; - - using QueryContextPtr = std::shared_ptr; - using QueryContextMap = std::unordered_map; - - QueryContextMap query_map; - - bool enable_filesystem_query_cache_limit; - - QueryContextPtr getCurrentQueryContext(std::lock_guard & cache_lock); - - QueryContextPtr getQueryContext(const String & query_id, std::lock_guard & cache_lock); - - void removeQueryContext(const String & query_id); - - QueryContextPtr getOrSetQueryContext(const String & query_id, const ReadSettings & settings, std::lock_guard &); - -public: /// Save a query context information, and adopt different cache policies /// for different queries through the context cache layer. struct QueryContextHolder : private boost::noncopyable @@ -206,6 +129,43 @@ public: QueryContextHolder getQueryContextHolder(const String & query_id, const ReadSettings & settings); private: + String cache_base_path; + + size_t max_size; + size_t max_element_size; + size_t max_file_segment_size; + + bool allow_persistent_files; + size_t enable_cache_hits_threshold; + bool enable_filesystem_query_cache_limit; + + Poco::Logger * log; + bool is_initialized = false; + + mutable std::mutex mutex; + + bool tryReserve(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock); + + void remove( + Key key, + size_t offset, + std::lock_guard & cache_lock, + std::lock_guard & segment_lock); + + bool isLastFileSegmentHolder( + const Key & key, + size_t offset, + std::lock_guard & cache_lock, + std::lock_guard & segment_lock); + + void reduceSizeToDownloaded( + const Key & key, + size_t offset, + std::lock_guard & cache_lock, + std::lock_guard & segment_lock); + + void assertInitialized() const; + struct FileSegmentCell : private boost::noncopyable { FileSegmentPtr file_segment; @@ -223,24 +183,30 @@ private: FileSegmentCell(FileSegmentPtr file_segment_, FileCache * cache, std::lock_guard & cache_lock); FileSegmentCell(FileSegmentCell && other) noexcept - : file_segment(std::move(other.file_segment)), queue_iterator(other.queue_iterator) + : file_segment(std::move(other.file_segment)), queue_iterator(std::move(other.queue_iterator)) {} + }; + + using AccessKeyAndOffset = std::pair; + struct KeyAndOffsetHash + { + std::size_t operator()(const AccessKeyAndOffset & key) const { + return std::hash()(key.first.key) ^ std::hash()(key.second); } }; using FileSegmentsByOffset = std::map; using CachedFiles = std::unordered_map; + using FileCacheRecords = std::unordered_map; CachedFiles files; std::unique_ptr main_priority; FileCacheRecords stash_records; std::unique_ptr stash_priority; - size_t max_stash_element_size; - size_t enable_cache_hits_threshold; - Poco::Logger * log; + void loadCacheInfoIntoMemory(std::lock_guard & cache_lock); FileSegments getImpl(const Key & key, const FileSegment::Range & range, std::lock_guard & cache_lock); @@ -257,11 +223,11 @@ private: void useCell(const FileSegmentCell & cell, FileSegments & result, std::lock_guard & cache_lock) const; bool tryReserveForMainList( - const Key & key, size_t offset, size_t size, QueryContextPtr query_context, std::lock_guard & cache_lock); - - size_t getAvailableCacheSize() const; - - void loadCacheInfoIntoMemory(std::lock_guard & cache_lock); + const Key & key, + size_t offset, + size_t size, + QueryContextPtr query_context, + std::lock_guard & cache_lock); FileSegments splitRangeIntoCells( const Key & key, @@ -289,6 +255,48 @@ private: void assertCacheCellsCorrectness(const FileSegmentsByOffset & cells_by_offset, std::lock_guard & cache_lock); + /// Used to track and control the cache access of each query. + /// Through it, we can realize the processing of different queries by the cache layer. + struct QueryContext + { + FileCacheRecords records; + FileCachePriorityPtr priority; + + size_t cache_size = 0; + size_t max_cache_size; + + bool skip_download_if_exceeds_query_cache; + + QueryContext(size_t max_cache_size_, bool skip_download_if_exceeds_query_cache_) + : max_cache_size(max_cache_size_) + , skip_download_if_exceeds_query_cache(skip_download_if_exceeds_query_cache_) {} + + size_t getMaxCacheSize() const { return max_cache_size; } + + size_t getCacheSize() const { return cache_size; } + + FileCachePriorityPtr getPriority() const { return priority; } + + bool isSkipDownloadIfExceed() const { return skip_download_if_exceeds_query_cache; } + + void remove(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock); + + void reserve(const Key & key, size_t offset, size_t size, std::lock_guard & cache_lock); + + void use(const Key & key, size_t offset, std::lock_guard & cache_lock); + }; + + using QueryContextMap = std::unordered_map; + QueryContextMap query_map; + + QueryContextPtr getCurrentQueryContext(std::lock_guard & cache_lock); + + QueryContextPtr getQueryContext(const String & query_id, std::lock_guard & cache_lock); + + void removeQueryContext(const String & query_id); + + QueryContextPtr getOrSetQueryContext(const String & query_id, const ReadSettings & settings, std::lock_guard &); + public: void assertCacheCorrectness(const Key & key, std::lock_guard & cache_lock);