diff --git a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp index e459aae190c..742d735cc95 100644 --- a/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp +++ b/src/Disks/ObjectStorages/Cached/CachedObjectStorage.cpp @@ -98,7 +98,7 @@ std::unique_ptr CachedObjectStorage::writeObject( /// N auto implementation_buffer = object_storage->writeObject(object, mode, attributes, buf_size, modified_write_settings); bool cache_on_write = modified_write_settings.enable_filesystem_cache_on_write_operations - && FileCacheFactory::instance().getByName(cache_config_name).settings.cache_on_write_operations + && FileCacheFactory::instance().getByName(cache_config_name)->getSettings().cache_on_write_operations && fs::path(object.remote_path).extension() != ".tmp"; /// Need to remove even if cache_on_write == false. diff --git a/src/Interpreters/Cache/FileCache.cpp b/src/Interpreters/Cache/FileCache.cpp index a9173a94c9c..f9cb8b5bd43 100644 --- a/src/Interpreters/Cache/FileCache.cpp +++ b/src/Interpreters/Cache/FileCache.cpp @@ -53,13 +53,15 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & settings_) - : settings(settings_) +FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & settings) + : max_file_segment_size(settings.max_file_segment_size) + , bypass_cache_threshold(settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0) + , boundary_alignment(settings.boundary_alignment) + , background_download_threads(settings.background_download_threads) + , load_metadata_threads(settings.load_metadata_threads) , log(&Poco::Logger::get("FileCache(" + cache_name + ")")) , metadata(settings.base_path, settings.background_download_queue_size_limit) { - settings.bypass_cache_threshold = settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0; - main_priority = std::make_unique(settings.max_size, settings.max_elements); if (settings.cache_hits_threshold) @@ -131,12 +133,12 @@ void FileCache::initialize() throw; } - is_initialized = true; - - for (size_t i = 0; i < settings.background_download_threads; ++i) + for (size_t i = 0; i < background_download_threads; ++i) download_threads.emplace_back([this] { metadata.downloadThreadFunc(); }); cleanup_thread = std::make_unique(std::function{ [this]{ metadata.cleanupThreadFunc(); }}); + + is_initialized = true; } CacheGuard::Lock FileCache::lockCache() const @@ -150,7 +152,7 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment: /// Given range = [left, right] and non-overlapping ordered set of file segments, /// find list [segment1, ..., segmentN] of segments which intersect with given range. - if (settings.bypass_cache_threshold && range.size() > settings.bypass_cache_threshold) + if (bypass_cache_threshold && range.size() > bypass_cache_threshold) { auto file_segment = std::make_shared( locked_key.getKey(), range.left, range.size(), FileSegment::State::DETACHED); @@ -255,7 +257,7 @@ std::vector FileCache::splitRange(size_t offset, size_t size FileSegments file_segments; while (current_pos < end_pos_non_included) { - auto current_file_segment_size = std::min(remaining_size, settings.max_file_segment_size); + auto current_file_segment_size = std::min(remaining_size, max_file_segment_size); ranges.emplace_back(current_pos, current_pos + current_file_segment_size - 1); remaining_size -= current_file_segment_size; @@ -284,7 +286,7 @@ FileSegments FileCache::splitRangeIntoFileSegments( FileSegments file_segments; while (current_pos < end_pos_non_included && (!file_segments_limit || file_segments.size() < file_segments_limit)) { - current_file_segment_size = std::min(remaining_size, settings.max_file_segment_size); + current_file_segment_size = std::min(remaining_size, max_file_segment_size); remaining_size -= current_file_segment_size; auto file_segment_metadata_it = addFileSegment( @@ -487,8 +489,8 @@ FileCache::getOrSet( FileSegment::Range range(offset, offset + size - 1); - const auto aligned_offset = roundDownToMultiple(range.left, settings.boundary_alignment); - auto aligned_end_offset = std::min(roundUpToMultiple(offset + size, settings.boundary_alignment), file_size) - 1; + const auto aligned_offset = roundDownToMultiple(range.left, boundary_alignment); + auto aligned_end_offset = std::min(roundUpToMultiple(offset + size, boundary_alignment), file_size) - 1; chassert(aligned_offset <= range.left); chassert(aligned_end_offset >= range.right); @@ -701,7 +703,7 @@ KeyMetadata::iterator FileCache::addFileSegment( result_state = state; } - auto file_segment = std::make_shared(key, offset, size, result_state, create_settings, settings.background_download_threads > 0, this, locked_key.getKeyMetadata()); + auto file_segment = std::make_shared(key, offset, size, result_state, create_settings, background_download_threads > 0, this, locked_key.getKeyMetadata()); auto file_segment_metadata = std::make_shared(std::move(file_segment)); auto [file_segment_metadata_it, inserted] = locked_key.emplace(offset, file_segment_metadata); @@ -1049,9 +1051,9 @@ void FileCache::loadMetadataImpl() std::mutex set_exception_mutex; std::atomic stop_loading = false; - LOG_INFO(log, "Loading filesystem cache with {} threads", settings.load_metadata_threads); + LOG_INFO(log, "Loading filesystem cache with {} threads", load_metadata_threads); - for (size_t i = 0; i < settings.load_metadata_threads; ++i) + for (size_t i = 0; i < load_metadata_threads; ++i) { try { @@ -1352,34 +1354,47 @@ void FileCache::assertCacheCorrectness() }); } -FileCacheSettings FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings) +void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings, FileCacheSettings & actual_settings) { - if (!is_initialized) - return settings; + if (!is_initialized || shutdown) + return; - auto lock = lockCache(); - if (shutdown) - return settings; - - if (settings.background_download_queue_size_limit != new_settings.background_download_queue_size_limit) + size_t add_download_threads = 0; { - LOG_DEBUG(log, "Changing background_download_queue_size_limit from {} to {}", - settings.background_download_queue_size_limit, new_settings.background_download_queue_size_limit); + std::lock_guard lock(apply_settings_mutex); - metadata.setBackgroundDownloadQueueSizeLimit(new_settings.background_download_queue_size_limit); - settings.background_download_queue_size_limit = new_settings.background_download_queue_size_limit; + if (new_settings == actual_settings) + return; + + size_t background_download_queue_size_limit = metadata.getBackgroundDownloadQueueSizeLimit(); + if (background_download_queue_size_limit != new_settings.background_download_queue_size_limit) + { + LOG_DEBUG(log, "Changing background_download_queue_size_limit from {} to {}", + background_download_queue_size_limit, new_settings.background_download_queue_size_limit); + + metadata.setBackgroundDownloadQueueSizeLimit(new_settings.background_download_queue_size_limit); + actual_settings.background_download_queue_size_limit = new_settings.background_download_queue_size_limit; + } + + if (background_download_threads < new_settings.background_download_threads) + { + LOG_DEBUG(log, "Changing background_download_threads from {} to {}", + background_download_threads, new_settings.background_download_threads); + + add_download_threads = new_settings.background_download_threads - background_download_threads; + background_download_threads = actual_settings.background_download_threads = new_settings.background_download_threads; + } } - if (settings.background_download_threads < new_settings.background_download_threads) - { - LOG_DEBUG(log, "Changing background_download_threads from {} to {}", settings.background_download_threads, new_settings.background_download_threads); - size_t threads_to_add = new_settings.background_download_threads - settings.background_download_threads; - for (size_t i = 0; i < threads_to_add; ++i) + if (add_download_threads) + { + auto lock = lockCache(); + if (shutdown) + return; + + for (size_t i = 0; i < add_download_threads; ++i) download_threads.emplace_back([this] { metadata.downloadThreadFunc(); }); - - settings.background_download_threads = new_settings.background_download_threads; } - return settings; } FileCache::QueryContextHolder::QueryContextHolder( diff --git a/src/Interpreters/Cache/FileCache.h b/src/Interpreters/Cache/FileCache.h index 3150eac6091..539332c88b0 100644 --- a/src/Interpreters/Cache/FileCache.h +++ b/src/Interpreters/Cache/FileCache.h @@ -56,7 +56,7 @@ public: using PriorityIterator = IFileCachePriority::Iterator; using PriorityIterationResult = IFileCachePriority::IterationResult; - FileCache(const std::string & cache_name, const FileCacheSettings & settings_); + FileCache(const std::string & cache_name, const FileCacheSettings & settings); ~FileCache(); @@ -123,7 +123,7 @@ public: size_t getFileSegmentsNum() const; - size_t getMaxFileSegmentSize() const { return settings.max_file_segment_size; } + size_t getMaxFileSegmentSize() const { return max_file_segment_size; } bool tryReserve(FileSegment & file_segment, size_t size, FileCacheReserveStat & stat); @@ -155,12 +155,16 @@ public: FileSegments sync(); - FileCacheSettings applySettingsIfPossible(const FileCacheSettings & settings); + void applySettingsIfPossible(const FileCacheSettings & new_settings, FileCacheSettings & actual_settings); private: using KeyAndOffset = FileCacheKeyAndOffset; - FileCacheSettings settings; + const size_t max_file_segment_size; + const size_t bypass_cache_threshold; + const size_t boundary_alignment; + size_t background_download_threads; /// 0 means background download is disabled. + size_t load_metadata_threads; Poco::Logger * log; @@ -168,7 +172,9 @@ private: std::atomic is_initialized = false; mutable std::mutex init_mutex; std::unique_ptr status_file; - size_t shutdown = false; + std::atomic shutdown = false; + + std::mutex apply_settings_mutex; CacheMetadata metadata; diff --git a/src/Interpreters/Cache/FileCacheFactory.cpp b/src/Interpreters/Cache/FileCacheFactory.cpp index 3ae27c4d3aa..1c42416bfa9 100644 --- a/src/Interpreters/Cache/FileCacheFactory.cpp +++ b/src/Interpreters/Cache/FileCacheFactory.cpp @@ -9,6 +9,22 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; } +FileCacheFactory::FileCacheData::FileCacheData( + FileCachePtr cache_, + const FileCacheSettings & settings_, + const std::string & config_path_) + : cache(cache_) + , config_path(config_path_) + , settings(settings_) +{ +} + +FileCacheSettings FileCacheFactory::FileCacheData::getSettings() const +{ + std::lock_guard lock(settings_mutex); + return settings; +} + FileCacheFactory & FileCacheFactory::instance() { static FileCacheFactory ret; @@ -39,7 +55,7 @@ FileCachePtr FileCacheFactory::getOrCreate( return it->second->cache; } -FileCacheFactory::FileCacheData FileCacheFactory::getByName(const std::string & cache_name) +FileCacheFactory::FileCacheDataPtr FileCacheFactory::getByName(const std::string & cache_name) { std::lock_guard lock(mutex); @@ -47,7 +63,7 @@ FileCacheFactory::FileCacheData FileCacheFactory::getByName(const std::string & if (it == caches_by_name.end()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no cache by name: {}", cache_name); - return *it->second; + return it->second; } void FileCacheFactory::updateSettingsFromConfig(const Poco::Util::AbstractConfiguration & config) @@ -63,13 +79,24 @@ void FileCacheFactory::updateSettingsFromConfig(const Poco::Util::AbstractConfig if (cache_info->config_path.empty()) continue; - FileCacheSettings settings; - settings.loadFromConfig(config, cache_info->config_path); + FileCacheSettings new_settings; + new_settings.loadFromConfig(config, cache_info->config_path); - if (settings == cache_info->settings) - continue; + FileCacheSettings old_settings; + { + std::lock_guard lock(cache_info->settings_mutex); + if (new_settings == cache_info->settings) + continue; - cache_info->settings = cache_info->cache->applySettingsIfPossible(settings); + old_settings = cache_info->settings; + } + + cache_info->cache->applySettingsIfPossible(new_settings, old_settings); + + { + std::lock_guard lock(cache_info->settings_mutex); + cache_info->settings = old_settings; + } } } diff --git a/src/Interpreters/Cache/FileCacheFactory.h b/src/Interpreters/Cache/FileCacheFactory.h index 0933e6e0d64..4e03beedbe9 100644 --- a/src/Interpreters/Cache/FileCacheFactory.h +++ b/src/Interpreters/Cache/FileCacheFactory.h @@ -16,16 +16,22 @@ namespace DB class FileCacheFactory final : private boost::noncopyable { public: - struct FileCacheData + class FileCacheData { - FileCachePtr cache; - FileCacheSettings settings; - std::string config_path; + friend class FileCacheFactory; + public: + FileCacheData(FileCachePtr cache_, const FileCacheSettings & settings_, const std::string & config_path_); + + FileCacheSettings getSettings() const; + + const FileCachePtr cache; + const std::string config_path; + + private: + FileCacheSettings settings; + mutable std::mutex settings_mutex; + }; - FileCacheData() = default; - FileCacheData(FileCachePtr cache_, const FileCacheSettings & settings_, const std::string & config_path_) - : cache(cache_), settings(settings_), config_path(config_path_) {} - }; using FileCacheDataPtr = std::shared_ptr; using CacheByName = std::unordered_map; @@ -38,7 +44,7 @@ public: CacheByName getAll(); - FileCacheData getByName(const std::string & cache_name); + FileCacheDataPtr getByName(const std::string & cache_name); void updateSettingsFromConfig(const Poco::Util::AbstractConfiguration & config); diff --git a/src/Interpreters/Cache/Metadata.cpp b/src/Interpreters/Cache/Metadata.cpp index c6a1f56559f..a3c1ca7d082 100644 --- a/src/Interpreters/Cache/Metadata.cpp +++ b/src/Interpreters/Cache/Metadata.cpp @@ -483,6 +483,12 @@ public: return true; } + size_t getQueueLimit() const + { + std::lock_guard lock(mutex); + return queue_size_limit; + } + void setQueueLimit(size_t size) { std::lock_guard lock(mutex); @@ -500,7 +506,7 @@ private: } size_t queue_size_limit; - std::mutex mutex; + mutable std::mutex mutex; std::condition_variable cv; bool cancelled = false; @@ -613,6 +619,11 @@ void CacheMetadata::downloadThreadFunc() } } +size_t CacheMetadata::getBackgroundDownloadQueueSizeLimit() const +{ + return download_queue->getQueueLimit(); +} + void CacheMetadata::setBackgroundDownloadQueueSizeLimit(size_t size) { download_queue->setQueueLimit(size); diff --git a/src/Interpreters/Cache/Metadata.h b/src/Interpreters/Cache/Metadata.h index f197e1c2751..8259ebaa209 100644 --- a/src/Interpreters/Cache/Metadata.h +++ b/src/Interpreters/Cache/Metadata.h @@ -154,6 +154,7 @@ public: void cancelDownload(); + size_t getBackgroundDownloadQueueSizeLimit() const; void setBackgroundDownloadQueueSizeLimit(size_t size); private: diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index fac2ddc1243..c5860f8149c 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -1065,7 +1065,7 @@ void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t if (shared->root_temp_data_on_disk) throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set"); - auto file_cache = FileCacheFactory::instance().getByName(disk_ptr->getCacheName()).cache; + auto file_cache = FileCacheFactory::instance().getByName(disk_ptr->getCacheName())->cache; if (!file_cache) throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Cache '{}' is not found", disk_ptr->getCacheName()); diff --git a/src/Interpreters/InterpreterDescribeCacheQuery.cpp b/src/Interpreters/InterpreterDescribeCacheQuery.cpp index 69b612eb2ef..54b43a8850b 100644 --- a/src/Interpreters/InterpreterDescribeCacheQuery.cpp +++ b/src/Interpreters/InterpreterDescribeCacheQuery.cpp @@ -42,8 +42,8 @@ BlockIO InterpreterDescribeCacheQuery::execute() MutableColumns res_columns = sample_block.cloneEmptyColumns(); auto cache_data = FileCacheFactory::instance().getByName(ast.cache_name); - const auto & settings = cache_data.settings; - const auto & cache = cache_data.cache; + auto settings = cache_data->getSettings(); + const auto & cache = cache_data->cache; size_t i = 0; res_columns[i++]->insert(settings.max_size); diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index 9c3db6cfdbd..579427a3999 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -378,7 +378,7 @@ BlockIO InterpreterSystemQuery::execute() } else { - auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache; + auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name)->cache; if (query.key_to_drop.empty()) { cache->removeAllReleasable(); @@ -432,7 +432,7 @@ BlockIO InterpreterSystemQuery::execute() } else { - auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache; + auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name)->cache; auto file_segments = cache->sync(); fill_data(query.filesystem_cache_name, cache, file_segments); } diff --git a/src/Storages/MergeTree/checkDataPart.cpp b/src/Storages/MergeTree/checkDataPart.cpp index a75df00e8a7..a68cacda2cc 100644 --- a/src/Storages/MergeTree/checkDataPart.cpp +++ b/src/Storages/MergeTree/checkDataPart.cpp @@ -335,7 +335,7 @@ IMergeTreeDataPart::Checksums checkDataPart( &Poco::Logger::get("checkDataPart"), "Will drop cache for data part {} and will check it once again", data_part->name); - auto & cache = *FileCacheFactory::instance().getByName(*cache_name).cache; + auto & cache = *FileCacheFactory::instance().getByName(*cache_name)->cache; for (auto it = data_part_storage.iterate(); it->isValid(); it->next()) { auto file_name = it->name(); diff --git a/src/Storages/System/StorageSystemDisks.cpp b/src/Storages/System/StorageSystemDisks.cpp index 23a00cc7ae5..9abbab9ff91 100644 --- a/src/Storages/System/StorageSystemDisks.cpp +++ b/src/Storages/System/StorageSystemDisks.cpp @@ -78,7 +78,7 @@ Pipe StorageSystemDisks::read( String cache_path; if (disk_ptr->supportsCache()) - cache_path = FileCacheFactory::instance().getByName(disk_ptr->getCacheName()).settings.base_path; + cache_path = FileCacheFactory::instance().getByName(disk_ptr->getCacheName())->getSettings().base_path; col_cache_path->insert(cache_path); } diff --git a/src/Storages/System/StorageSystemRemoteDataPaths.cpp b/src/Storages/System/StorageSystemRemoteDataPaths.cpp index b1cd90448ec..87b7a84e8ba 100644 --- a/src/Storages/System/StorageSystemRemoteDataPaths.cpp +++ b/src/Storages/System/StorageSystemRemoteDataPaths.cpp @@ -63,7 +63,7 @@ Pipe StorageSystemRemoteDataPaths::read( FileCachePtr cache; if (disk->supportsCache()) - cache = FileCacheFactory::instance().getByName(disk->getCacheName()).cache; + cache = FileCacheFactory::instance().getByName(disk->getCacheName())->cache; for (const auto & [local_path, storage_objects] : remote_paths_by_local_path) {