diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index d74be9db386..a4f94ccf313 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -351,12 +351,9 @@ void FileCache::fillHolesWithEmptyFileSegments( } } -KeyTransactionPtr FileCache::createKeyTransaction(const Key & key, KeyNotFoundPolicy key_not_found_policy, bool assert_initialized) +KeyTransactionPtr FileCache::createKeyTransaction(const Key & key, KeyNotFoundPolicy key_not_found_policy) { - auto lock = cache_guard.lock(); - - if (assert_initialized) - assertInitializedUnlocked(lock); + std::lock_guard lock(key_locks_and_files_mutex); auto it = files.find(key); if (it == files.end()) @@ -391,6 +388,8 @@ KeyTransactionPtr FileCache::createKeyTransaction(const Key & key, KeyNotFoundPo FileSegmentsHolderPtr FileCache::set(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings) { + assertInitialized(); + auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::CREATE_EMPTY); FileSegment::Range range(offset, offset + size - 1); @@ -416,6 +415,8 @@ FileSegmentsHolderPtr FileCache::getOrSet( size_t size, const CreateFileSegmentSettings & settings) { + assertInitialized(); + FileSegment::Range range(offset, offset + size - 1); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::CREATE_EMPTY); @@ -439,12 +440,14 @@ FileSegmentsHolderPtr FileCache::getOrSet( FileSegmentsHolderPtr FileCache::get(const Key & key, size_t offset, size_t size) { - /// Get all segments which intersect with the given range. + assertInitialized(); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::RETURN_NULL); if (key_transaction) { FileSegment::Range range(offset, offset + size - 1); + + /// Get all segments which intersect with the given range. auto file_segments = getImpl(key, range, *key_transaction); if (!file_segments.empty()) { @@ -533,6 +536,7 @@ FileCache::CacheCells::iterator FileCache::addCell( bool FileCache::tryReserve(const Key & key, size_t offset, size_t size) { + assertInitialized(); auto queue_lock = main_priority->lock(); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::THROW); return tryReserveUnlocked(key, offset, size, key_transaction, queue_lock); @@ -859,6 +863,8 @@ bool FileCache::tryReserveInCache( void FileCache::removeIfExists(const Key & key) { + assertInitialized(); + auto queue_lock = main_priority->lock(); auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::RETURN_NULL); if (!key_transaction) @@ -899,6 +905,8 @@ void FileCache::removeIfExists(const Key & key) void FileCache::removeAllReleasable() { + assertInitialized(); + using QueueEntry = IFileCachePriority::Entry; using IterationResult = IFileCachePriority::IterationResult; @@ -1066,7 +1074,7 @@ void FileCache::loadCacheInfoIntoMemory() continue; } - auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::CREATE_EMPTY, false); + auto key_transaction = createKeyTransaction(key, KeyNotFoundPolicy::CREATE_EMPTY); key_transaction->getOffsets().created_base_directory = true; if (tryReserveUnlocked(key, offset, size, key_transaction, queue_lock)) @@ -1083,7 +1091,7 @@ void FileCache::loadCacheInfoIntoMemory() log, "Cache capacity changed (max size: {}, used: {}), " "cached file `{}` does not fit in cache anymore (size: {})", - max_size, getUsedCacheSize(), key_it->path().string(), size); + max_size, main_priority->getCacheSize(queue_lock), key_it->path().string(), size); fs::remove(offset_it->path()); } @@ -1132,8 +1140,8 @@ void KeyTransaction::reduceSizeToDownloaded( } assert(file_segment->downloaded_size <= file_segment->reserved_size); - assert((*cell->queue_iterator).size == file_segment->reserved_size); - assert((*cell->queue_iterator).size >= file_segment->downloaded_size); + assert((*cell->queue_iterator)->size == file_segment->reserved_size); + assert((*cell->queue_iterator)->size >= file_segment->downloaded_size); if (file_segment->reserved_size > file_segment->downloaded_size) { @@ -1147,17 +1155,20 @@ void KeyTransaction::reduceSizeToDownloaded( FileSegment::State::DOWNLOADED, create_settings); assert(file_segment->reserved_size == downloaded_size); - assert(cell->size() == (*cell->queue_iterator).size); + assert(cell->size() == (*cell->queue_iterator)->size); } FileSegmentsHolderPtr FileCache::getSnapshot() { + assertInitialized(); + + std::lock_guard lock(key_locks_and_files_mutex); + FileSegments file_segments; - auto lock = cache_guard.lock(); - for (const auto & [key, _] : files) + for (const auto & [key, offsets] : files) { - auto key_file_segments = getSnapshot(key); - file_segments.insert(file_segments.end(), key_file_segments->begin(), key_file_segments->end()); + for (const auto & [_, cell] : *offsets) + file_segments.push_back(FileSegment::getSnapshot(cell.file_segment)); } return std::make_unique(std::move(file_segments)); } @@ -1173,6 +1184,7 @@ FileSegmentsHolderPtr FileCache::getSnapshot(const Key & key) FileSegmentsHolderPtr FileCache::dumpQueue() { + assertInitialized(); using QueueEntry = IFileCachePriority::Entry; using IterationResult = IFileCachePriority::IterationResult; @@ -1190,10 +1202,12 @@ FileSegmentsHolderPtr FileCache::dumpQueue() std::vector FileCache::tryGetCachePaths(const Key & key) { - auto lock = cache_guard.lock(); + assertInitialized(); + + std::lock_guard lock(key_locks_and_files_mutex); - std::vector cache_paths; const auto & cells_by_offset = files[key]; + std::vector cache_paths; for (const auto & [offset, cell] : *cells_by_offset) { @@ -1311,85 +1325,36 @@ KeyTransaction::KeyTransaction(KeyPrefixGuardPtr guard_, FileCache::CacheCellsPt { } -void FileCache::assertInitializedUnlocked(CacheGuard::Lock & lock) const +void FileCache::assertInitialized() const { - using State = InitializationState; + if (is_initialized) + return; - switch (initialization_state) - { - case State::NOT_INITIALIZED: - { - throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache not initialized"); - } - case State::FAILED: - { - assert(initialization_exception); - std::rethrow_exception(initialization_exception); - } - case State::INITIALIZED: - { - return; - } - case State::INITIALIZING: - { - initialization_cv.wait(lock.lock, [this]() { return initialization_state != State::INITIALIZING; }); - assertInitializedUnlocked(lock); - } - } + std::unique_lock lock(init_mutex); + if (is_initialized) + return; + + if (init_exception) + std::rethrow_exception(init_exception); + if (!is_initialized) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cache not initialized"); } void FileCache::initialize() { - using State = InitializationState; - { - auto lock = cache_guard.lock(); - - switch (initialization_state) - { - case State::NOT_INITIALIZED: - { - if (!fs::exists(cache_base_path)) - { - fs::create_directories(cache_base_path); - initialization_state = InitializationState::INITIALIZED; - return; - } - else - { - initialization_state = InitializationState::INITIALIZING; - } - break; - } - case State::FAILED: - { - assert(initialization_exception); - std::rethrow_exception(initialization_exception); - } - case State::INITIALIZED: - case State::INITIALIZING: - { - return; - } - } - } + std::lock_guard lock(init_mutex); try { loadCacheInfoIntoMemory(); - { - auto lock = cache_guard.lock(); - initialization_state = State::INITIALIZED; - } } catch (...) { - initialization_exception = std::current_exception(); - { - auto lock = cache_guard.lock(); - initialization_state = State::FAILED; - } + init_exception = std::current_exception(); throw; } + + is_initialized = true; } void FileCache::assertCacheCorrectness() diff --git a/src/Interpreters/Cache/FileCache.h b/src/Interpreters/Cache/FileCache.h index 8ae22fbcca7..3a070938e2d 100644 --- a/src/Interpreters/Cache/FileCache.h +++ b/src/Interpreters/Cache/FileCache.h @@ -165,18 +165,9 @@ private: }; using CacheCellsPtr = std::shared_ptr; - mutable CacheGuard cache_guard; - - enum class InitializationState - { - NOT_INITIALIZED, - INITIALIZING, - INITIALIZED, - FAILED, - }; - InitializationState initialization_state = InitializationState::NOT_INITIALIZED; - mutable std::condition_variable initialization_cv; - std::exception_ptr initialization_exception; + std::exception_ptr init_exception; + std::atomic is_initialized = false; + mutable std::mutex init_mutex; using CachedFiles = std::unordered_map; CachedFiles files; @@ -185,6 +176,8 @@ private: using KeysLocksMap = std::unordered_map; KeysLocksMap keys_locks; + mutable std::mutex key_locks_and_files_mutex; /// Protects `files` and `keys_locks` + enum class KeyNotFoundPolicy { THROW, @@ -192,7 +185,7 @@ private: RETURN_NULL, }; - KeyTransactionPtr createKeyTransaction(const Key & key, KeyNotFoundPolicy key_not_found_policy, bool assert_initialized = true); + KeyTransactionPtr createKeyTransaction(const Key & key, KeyNotFoundPolicy key_not_found_policy); KeyTransactionCreatorPtr getKeyTransactionCreator(const Key & key, KeyTransaction & key_transaction); @@ -285,7 +278,7 @@ public: }; private: - void assertInitializedUnlocked(CacheGuard::Lock &) const; + void assertInitialized() const; void loadCacheInfoIntoMemory(); diff --git a/src/Interpreters/Cache/Guards.h b/src/Interpreters/Cache/Guards.h index 363950cb677..c286186ddab 100644 --- a/src/Interpreters/Cache/Guards.h +++ b/src/Interpreters/Cache/Guards.h @@ -11,23 +11,6 @@ namespace DB * Cache priority queue guard > key prefix guard > file segment guard. */ -/** - * Guard for the whole the cache. - * Used to get a lock per key prefix and then continue with only key prefix locked. - */ -struct CacheGuard -{ - struct Lock - { - explicit Lock(CacheGuard & guard) : lock(guard.mutex) {} - std::unique_lock lock; - }; - - std::mutex mutex; - - Lock lock() { return Lock(*this); } -}; - /** * Guard for a set of keys. * One guard per key prefix (first three digits of the path hash).