Merge pull request #57578 from ClickHouse/allow-to-change-some-cache-settings-without-restart

Allow to apply some fs cache config settings changes without server restart
This commit is contained in:
Kseniia Sumarokova 2023-12-15 11:18:19 +01:00 committed by GitHub
commit 06438cc17b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 378 additions and 100 deletions

View File

@ -45,6 +45,7 @@
#include <Common/makeSocketAddress.h> #include <Common/makeSocketAddress.h>
#include <Common/FailPoint.h> #include <Common/FailPoint.h>
#include <Server/waitServersToFinish.h> #include <Server/waitServersToFinish.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Core/ServerUUID.h> #include <Core/ServerUUID.h>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFile.h> #include <IO/ReadBufferFromFile.h>
@ -1472,6 +1473,8 @@ try
#endif #endif
NamedCollectionUtils::reloadFromConfig(*config); NamedCollectionUtils::reloadFromConfig(*config);
FileCacheFactory::instance().updateSettingsFromConfig(*config);
ProfileEvents::increment(ProfileEvents::MainConfigLoads); ProfileEvents::increment(ProfileEvents::MainConfigLoads);
/// Must be the last. /// Must be the last.

View File

@ -98,7 +98,7 @@ std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// N
auto implementation_buffer = object_storage->writeObject(object, mode, attributes, buf_size, modified_write_settings); auto implementation_buffer = object_storage->writeObject(object, mode, attributes, buf_size, modified_write_settings);
bool cache_on_write = modified_write_settings.enable_filesystem_cache_on_write_operations bool cache_on_write = modified_write_settings.enable_filesystem_cache_on_write_operations
&& FileCacheFactory::instance().getByName(cache_config_name).settings.cache_on_write_operations && FileCacheFactory::instance().getByName(cache_config_name)->getSettings().cache_on_write_operations
&& fs::path(object.remote_path).extension() != ".tmp"; && fs::path(object.remote_path).extension() != ".tmp";
/// Need to remove even if cache_on_write == false. /// Need to remove even if cache_on_write == false.

View File

@ -64,7 +64,7 @@ void registerDiskCache(DiskFactory & factory, bool /* global_skip_access_check *
} }
} }
auto cache = FileCacheFactory::instance().getOrCreate(name, file_cache_settings); auto cache = FileCacheFactory::instance().getOrCreate(name, file_cache_settings, predefined_configuration ? "" : config_prefix);
auto disk = disk_it->second; auto disk = disk_it->second;
if (!dynamic_cast<const DiskObjectStorage *>(disk.get())) if (!dynamic_cast<const DiskObjectStorage *>(disk.get()))
throw Exception(ErrorCodes::BAD_ARGUMENTS, throw Exception(ErrorCodes::BAD_ARGUMENTS,

View File

@ -76,10 +76,9 @@ FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & s
: max_file_segment_size(settings.max_file_segment_size) : max_file_segment_size(settings.max_file_segment_size)
, bypass_cache_threshold(settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0) , bypass_cache_threshold(settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0)
, boundary_alignment(settings.boundary_alignment) , boundary_alignment(settings.boundary_alignment)
, background_download_threads(settings.background_download_threads) , load_metadata_threads(settings.load_metadata_threads)
, metadata_download_threads(settings.load_metadata_threads)
, log(&Poco::Logger::get("FileCache(" + cache_name + ")")) , log(&Poco::Logger::get("FileCache(" + cache_name + ")"))
, metadata(settings.base_path, settings.background_download_queue_size_limit) , metadata(settings.base_path, settings.background_download_queue_size_limit, settings.background_download_threads)
{ {
if (settings.cache_policy == "LRU") if (settings.cache_policy == "LRU")
main_priority = std::make_unique<LRUFileCachePriority>(settings.max_size, settings.max_elements); main_priority = std::make_unique<LRUFileCachePriority>(settings.max_size, settings.max_elements);
@ -159,12 +158,8 @@ void FileCache::initialize()
throw; throw;
} }
metadata.startup();
is_initialized = true; is_initialized = true;
for (size_t i = 0; i < background_download_threads; ++i)
download_threads.emplace_back([this] { metadata.downloadThreadFunc(); });
cleanup_thread = std::make_unique<ThreadFromGlobalPool>(std::function{ [this]{ metadata.cleanupThreadFunc(); }});
} }
CacheGuard::Lock FileCache::lockCache() const CacheGuard::Lock FileCache::lockCache() const
@ -299,7 +294,7 @@ FileSegments FileCache::splitRangeIntoFileSegments(
size_t size, size_t size,
FileSegment::State state, FileSegment::State state,
size_t file_segments_limit, size_t file_segments_limit,
const CreateFileSegmentSettings & settings) const CreateFileSegmentSettings & create_settings)
{ {
assert(size > 0); assert(size > 0);
@ -316,7 +311,7 @@ FileSegments FileCache::splitRangeIntoFileSegments(
remaining_size -= current_file_segment_size; remaining_size -= current_file_segment_size;
auto file_segment_metadata_it = addFileSegment( auto file_segment_metadata_it = addFileSegment(
locked_key, current_pos, current_file_segment_size, state, settings, nullptr); locked_key, current_pos, current_file_segment_size, state, create_settings, nullptr);
file_segments.push_back(file_segment_metadata_it->second->file_segment); file_segments.push_back(file_segment_metadata_it->second->file_segment);
current_pos += current_file_segment_size; current_pos += current_file_segment_size;
@ -331,7 +326,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
const FileSegment::Range & range, const FileSegment::Range & range,
size_t file_segments_limit, size_t file_segments_limit,
bool fill_with_detached_file_segments, bool fill_with_detached_file_segments,
const CreateFileSegmentSettings & settings) const CreateFileSegmentSettings & create_settings)
{ {
/// There are segments [segment1, ..., segmentN] /// There are segments [segment1, ..., segmentN]
/// (non-overlapping, non-empty, ascending-ordered) which (maybe partially) /// (non-overlapping, non-empty, ascending-ordered) which (maybe partially)
@ -388,7 +383,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
if (fill_with_detached_file_segments) if (fill_with_detached_file_segments)
{ {
auto file_segment = std::make_shared<FileSegment>( auto file_segment = std::make_shared<FileSegment>(
locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, settings); locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, create_settings);
file_segments.insert(it, file_segment); file_segments.insert(it, file_segment);
++processed_count; ++processed_count;
@ -399,7 +394,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
FileSegments hole; FileSegments hole;
for (const auto & r : ranges) for (const auto & r : ranges)
{ {
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, settings, nullptr); auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, create_settings, nullptr);
hole.push_back(metadata_it->second->file_segment); hole.push_back(metadata_it->second->file_segment);
++processed_count; ++processed_count;
@ -444,7 +439,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
if (fill_with_detached_file_segments) if (fill_with_detached_file_segments)
{ {
auto file_segment = std::make_shared<FileSegment>( auto file_segment = std::make_shared<FileSegment>(
locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, settings); locked_key.getKey(), current_pos, hole_size, FileSegment::State::DETACHED, create_settings);
file_segments.insert(file_segments.end(), file_segment); file_segments.insert(file_segments.end(), file_segment);
} }
@ -454,7 +449,7 @@ void FileCache::fillHolesWithEmptyFileSegments(
FileSegments hole; FileSegments hole;
for (const auto & r : ranges) for (const auto & r : ranges)
{ {
auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, settings, nullptr); auto metadata_it = addFileSegment(locked_key, r.left, r.size(), FileSegment::State::EMPTY, create_settings, nullptr);
hole.push_back(metadata_it->second->file_segment); hole.push_back(metadata_it->second->file_segment);
++processed_count; ++processed_count;
@ -473,7 +468,7 @@ FileSegmentsHolderPtr FileCache::set(
const Key & key, const Key & key,
size_t offset, size_t offset,
size_t size, size_t size,
const CreateFileSegmentSettings & settings) const CreateFileSegmentSettings & create_settings)
{ {
assertInitialized(); assertInitialized();
@ -484,17 +479,17 @@ FileSegmentsHolderPtr FileCache::set(
if (!file_segments.empty()) if (!file_segments.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Having intersection with already existing cache"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Having intersection with already existing cache");
if (settings.unbounded) if (create_settings.unbounded)
{ {
/// If the file is unbounded, we can create a single file_segment_metadata for it. /// If the file is unbounded, we can create a single file_segment_metadata for it.
auto file_segment_metadata_it = addFileSegment( auto file_segment_metadata_it = addFileSegment(
*locked_key, offset, size, FileSegment::State::EMPTY, settings, nullptr); *locked_key, offset, size, FileSegment::State::EMPTY, create_settings, nullptr);
file_segments = {file_segment_metadata_it->second->file_segment}; file_segments = {file_segment_metadata_it->second->file_segment};
} }
else else
{ {
file_segments = splitRangeIntoFileSegments( file_segments = splitRangeIntoFileSegments(
*locked_key, offset, size, FileSegment::State::EMPTY, /* file_segments_limit */0, settings); *locked_key, offset, size, FileSegment::State::EMPTY, /* file_segments_limit */0, create_settings);
} }
return std::make_unique<FileSegmentsHolder>(std::move(file_segments)); return std::make_unique<FileSegmentsHolder>(std::move(file_segments));
@ -506,7 +501,7 @@ FileCache::getOrSet(
size_t offset, size_t offset,
size_t size, size_t size,
size_t file_size, size_t file_size,
const CreateFileSegmentSettings & settings, const CreateFileSegmentSettings & create_settings,
size_t file_segments_limit) size_t file_segments_limit)
{ {
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheGetOrSetMicroseconds); ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheGetOrSetMicroseconds);
@ -612,7 +607,7 @@ FileCache::getOrSet(
if (file_segments.empty()) if (file_segments.empty())
{ {
file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, file_segments_limit, settings); file_segments = splitRangeIntoFileSegments(*locked_key, range.left, range.size(), FileSegment::State::EMPTY, file_segments_limit, create_settings);
} }
else else
{ {
@ -620,7 +615,7 @@ FileCache::getOrSet(
chassert(file_segments.back()->range().left <= range.right); chassert(file_segments.back()->range().left <= range.right);
fillHolesWithEmptyFileSegments( fillHolesWithEmptyFileSegments(
*locked_key, file_segments, range, file_segments_limit, /* fill_with_detached */false, settings); *locked_key, file_segments, range, file_segments_limit, /* fill_with_detached */false, create_settings);
if (!file_segments.front()->range().contains(offset)) if (!file_segments.front()->range().contains(offset))
{ {
@ -675,7 +670,7 @@ KeyMetadata::iterator FileCache::addFileSegment(
size_t offset, size_t offset,
size_t size, size_t size,
FileSegment::State state, FileSegment::State state,
const CreateFileSegmentSettings & settings, const CreateFileSegmentSettings & create_settings,
const CacheGuard::Lock * lock) const CacheGuard::Lock * lock)
{ {
/// Create a file_segment_metadata and put it in `files` map by [key][offset]. /// Create a file_segment_metadata and put it in `files` map by [key][offset].
@ -729,7 +724,7 @@ KeyMetadata::iterator FileCache::addFileSegment(
result_state = state; result_state = state;
} }
auto file_segment = std::make_shared<FileSegment>(key, offset, size, result_state, settings, background_download_threads > 0, this, locked_key.getKeyMetadata()); auto file_segment = std::make_shared<FileSegment>(key, offset, size, result_state, create_settings, metadata.isBackgroundDownloadEnabled(), this, locked_key.getKeyMetadata());
auto file_segment_metadata = std::make_shared<FileSegmentMetadata>(std::move(file_segment)); auto file_segment_metadata = std::make_shared<FileSegmentMetadata>(std::move(file_segment));
auto [file_segment_metadata_it, inserted] = locked_key.emplace(offset, file_segment_metadata); auto [file_segment_metadata_it, inserted] = locked_key.emplace(offset, file_segment_metadata);
@ -933,9 +928,9 @@ void FileCache::loadMetadataImpl()
std::mutex set_exception_mutex; std::mutex set_exception_mutex;
std::atomic<bool> stop_loading = false; std::atomic<bool> stop_loading = false;
LOG_INFO(log, "Loading filesystem cache with {} threads", metadata_download_threads); LOG_INFO(log, "Loading filesystem cache with {} threads", load_metadata_threads);
for (size_t i = 0; i < metadata_download_threads; ++i) for (size_t i = 0; i < load_metadata_threads; ++i)
{ {
try try
{ {
@ -1137,15 +1132,8 @@ FileCache::~FileCache()
void FileCache::deactivateBackgroundOperations() void FileCache::deactivateBackgroundOperations()
{ {
metadata.cancelDownload(); shutdown.store(true);
metadata.cancelCleanup(); metadata.shutdown();
for (auto & thread : download_threads)
if (thread.joinable())
thread.join();
if (cleanup_thread && cleanup_thread->joinable())
cleanup_thread->join();
} }
std::vector<FileSegment::Info> FileCache::getFileSegmentInfos() std::vector<FileSegment::Info> FileCache::getFileSegmentInfos()
@ -1220,6 +1208,43 @@ void FileCache::assertCacheCorrectness()
}); });
} }
void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings, FileCacheSettings & actual_settings)
{
if (!is_initialized || shutdown || new_settings == actual_settings)
return;
std::lock_guard lock(apply_settings_mutex);
if (metadata.setBackgroundDownloadQueueSizeLimit(new_settings.background_download_queue_size_limit))
{
LOG_INFO(log, "Changed background_download_queue_size from {} to {}",
actual_settings.background_download_queue_size_limit,
new_settings.background_download_queue_size_limit);
actual_settings.background_download_queue_size_limit = new_settings.background_download_queue_size_limit;
}
bool updated;
try
{
updated = metadata.setBackgroundDownloadThreads(new_settings.background_download_threads);
}
catch (...)
{
actual_settings.background_download_threads = metadata.getBackgroundDownloadThreads();
throw;
}
if (updated)
{
LOG_INFO(log, "Changed background_download_threads from {} to {}",
actual_settings.background_download_threads,
new_settings.background_download_threads);
actual_settings.background_download_threads = new_settings.background_download_threads;
}
}
FileCache::QueryContextHolder::QueryContextHolder( FileCache::QueryContextHolder::QueryContextHolder(
const String & query_id_, const String & query_id_,
FileCache * cache_, FileCache * cache_,
@ -1242,13 +1267,13 @@ FileCache::QueryContextHolder::~QueryContextHolder()
} }
FileCache::QueryContextHolderPtr FileCache::getQueryContextHolder( FileCache::QueryContextHolderPtr FileCache::getQueryContextHolder(
const String & query_id, const ReadSettings & settings) const String & query_id, const ReadSettings & read_settings)
{ {
if (!query_limit || settings.filesystem_cache_max_download_size == 0) if (!query_limit || read_settings.filesystem_cache_max_download_size == 0)
return {}; return {};
auto lock = lockCache(); auto lock = lockCache();
auto context = query_limit->getOrSetQueryContext(query_id, settings, lock); auto context = query_limit->getOrSetQueryContext(query_id, read_settings, lock);
return std::make_unique<QueryContextHolder>(query_id, this, std::move(context)); return std::make_unique<QueryContextHolder>(query_id, this, std::move(context));
} }

View File

@ -16,6 +16,7 @@
#include <Interpreters/Cache/Metadata.h> #include <Interpreters/Cache/Metadata.h>
#include <Interpreters/Cache/QueryLimit.h> #include <Interpreters/Cache/QueryLimit.h>
#include <Interpreters/Cache/FileCache_fwd_internal.h> #include <Interpreters/Cache/FileCache_fwd_internal.h>
#include <Interpreters/Cache/FileCacheSettings.h>
#include <filesystem> #include <filesystem>
@ -150,14 +151,15 @@ public:
std::vector<FileSegment::Info> sync(); std::vector<FileSegment::Info> sync();
void applySettingsIfPossible(const FileCacheSettings & new_settings, FileCacheSettings & actual_settings);
private: private:
using KeyAndOffset = FileCacheKeyAndOffset; using KeyAndOffset = FileCacheKeyAndOffset;
const size_t max_file_segment_size; const size_t max_file_segment_size;
const size_t bypass_cache_threshold = 0; const size_t bypass_cache_threshold;
const size_t boundary_alignment; const size_t boundary_alignment;
const size_t background_download_threads; /// 0 means background download is disabled. size_t load_metadata_threads;
const size_t metadata_download_threads;
Poco::Logger * log; Poco::Logger * log;
@ -165,6 +167,9 @@ private:
std::atomic<bool> is_initialized = false; std::atomic<bool> is_initialized = false;
mutable std::mutex init_mutex; mutable std::mutex init_mutex;
std::unique_ptr<StatusFile> status_file; std::unique_ptr<StatusFile> status_file;
std::atomic<bool> shutdown = false;
std::mutex apply_settings_mutex;
CacheMetadata metadata; CacheMetadata metadata;
@ -195,12 +200,6 @@ private:
* then allowed loaded cache size is std::min(n - k, max_query_cache_size). * then allowed loaded cache size is std::min(n - k, max_query_cache_size).
*/ */
FileCacheQueryLimitPtr query_limit; FileCacheQueryLimitPtr query_limit;
/**
* A background cleanup task.
* Clears removed cache entries from metadata.
*/
std::vector<ThreadFromGlobalPool> download_threads;
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;
void assertInitialized() const; void assertInitialized() const;
void assertCacheCorrectness(); void assertCacheCorrectness();

View File

@ -9,6 +9,22 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS; extern const int BAD_ARGUMENTS;
} }
FileCacheFactory::FileCacheData::FileCacheData(
FileCachePtr cache_,
const FileCacheSettings & settings_,
const std::string & config_path_)
: cache(cache_)
, config_path(config_path_)
, settings(settings_)
{
}
FileCacheSettings FileCacheFactory::FileCacheData::getSettings() const
{
std::lock_guard lock(settings_mutex);
return settings;
}
FileCacheFactory & FileCacheFactory::instance() FileCacheFactory & FileCacheFactory::instance()
{ {
static FileCacheFactory ret; static FileCacheFactory ret;
@ -22,7 +38,9 @@ FileCacheFactory::CacheByName FileCacheFactory::getAll()
} }
FileCachePtr FileCacheFactory::getOrCreate( FileCachePtr FileCacheFactory::getOrCreate(
const std::string & cache_name, const FileCacheSettings & file_cache_settings) const std::string & cache_name,
const FileCacheSettings & file_cache_settings,
const std::string & config_path)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
@ -31,13 +49,16 @@ FileCachePtr FileCacheFactory::getOrCreate(
{ {
auto cache = std::make_shared<FileCache>(cache_name, file_cache_settings); auto cache = std::make_shared<FileCache>(cache_name, file_cache_settings);
it = caches_by_name.emplace( it = caches_by_name.emplace(
cache_name, std::make_unique<FileCacheData>(cache, file_cache_settings)).first; cache_name, std::make_unique<FileCacheData>(cache, file_cache_settings, config_path)).first;
} }
return it->second->cache; return it->second->cache;
} }
FileCachePtr FileCacheFactory::create(const std::string & cache_name, const FileCacheSettings & file_cache_settings) FileCachePtr FileCacheFactory::create(
const std::string & cache_name,
const FileCacheSettings & file_cache_settings,
const std::string & config_path)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
@ -47,12 +68,12 @@ FileCachePtr FileCacheFactory::create(const std::string & cache_name, const File
auto cache = std::make_shared<FileCache>(cache_name, file_cache_settings); auto cache = std::make_shared<FileCache>(cache_name, file_cache_settings);
it = caches_by_name.emplace( it = caches_by_name.emplace(
cache_name, std::make_unique<FileCacheData>(cache, file_cache_settings)).first; cache_name, std::make_unique<FileCacheData>(cache, file_cache_settings, config_path)).first;
return it->second->cache; return it->second->cache;
} }
FileCacheFactory::FileCacheData FileCacheFactory::getByName(const std::string & cache_name) FileCacheFactory::FileCacheDataPtr FileCacheFactory::getByName(const std::string & cache_name)
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
@ -60,7 +81,41 @@ FileCacheFactory::FileCacheData FileCacheFactory::getByName(const std::string &
if (it == caches_by_name.end()) if (it == caches_by_name.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no cache by name: {}", cache_name); throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no cache by name: {}", cache_name);
return *it->second; return it->second;
}
void FileCacheFactory::updateSettingsFromConfig(const Poco::Util::AbstractConfiguration & config)
{
CacheByName caches_by_name_copy;
{
std::lock_guard lock(mutex);
caches_by_name_copy = caches_by_name;
}
for (const auto & [_, cache_info] : caches_by_name_copy)
{
if (cache_info->config_path.empty())
continue;
FileCacheSettings new_settings;
new_settings.loadFromConfig(config, cache_info->config_path);
FileCacheSettings old_settings;
{
std::lock_guard lock(cache_info->settings_mutex);
if (new_settings == cache_info->settings)
continue;
old_settings = cache_info->settings;
}
cache_info->cache->applySettingsIfPossible(new_settings, old_settings);
{
std::lock_guard lock(cache_info->settings_mutex);
cache_info->settings = old_settings;
}
}
} }
} }

View File

@ -6,7 +6,6 @@
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <unordered_map> #include <unordered_map>
#include <mutex> #include <mutex>
#include <list>
namespace DB namespace DB
{ {
@ -17,26 +16,42 @@ namespace DB
class FileCacheFactory final : private boost::noncopyable class FileCacheFactory final : private boost::noncopyable
{ {
public: public:
struct FileCacheData class FileCacheData
{ {
FileCachePtr cache; friend class FileCacheFactory;
FileCacheSettings settings; public:
FileCacheData(FileCachePtr cache_, const FileCacheSettings & settings_, const std::string & config_path_);
FileCacheData() = default; FileCacheSettings getSettings() const;
FileCacheData(FileCachePtr cache_, const FileCacheSettings & settings_) : cache(cache_), settings(settings_) {}
const FileCachePtr cache;
const std::string config_path;
private:
FileCacheSettings settings;
mutable std::mutex settings_mutex;
}; };
using FileCacheDataPtr = std::shared_ptr<FileCacheData>; using FileCacheDataPtr = std::shared_ptr<FileCacheData>;
using CacheByName = std::unordered_map<std::string, FileCacheDataPtr>; using CacheByName = std::unordered_map<std::string, FileCacheDataPtr>;
static FileCacheFactory & instance(); static FileCacheFactory & instance();
FileCachePtr getOrCreate(const std::string & cache_name, const FileCacheSettings & file_cache_settings); FileCachePtr getOrCreate(
const std::string & cache_name,
const FileCacheSettings & file_cache_settings,
const std::string & config_path);
FileCachePtr create(const std::string & cache_name, const FileCacheSettings & file_cache_settings); FileCachePtr create(
const std::string & cache_name,
const FileCacheSettings & file_cache_settings,
const std::string & config_path);
CacheByName getAll(); CacheByName getAll();
FileCacheData getByName(const std::string & cache_name); FileCacheDataPtr getByName(const std::string & cache_name);
void updateSettingsFromConfig(const Poco::Util::AbstractConfiguration & config);
private: private:
std::mutex mutex; std::mutex mutex;

View File

@ -38,6 +38,8 @@ struct FileCacheSettings
void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix); void loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
void loadFromCollection(const NamedCollection & collection); void loadFromCollection(const NamedCollection & collection);
bool operator ==(const FileCacheSettings &) const = default;
private: private:
using FuncHas = std::function<bool(std::string_view)>; using FuncHas = std::function<bool(std::string_view)>;
using FuncGetUInt = std::function<size_t(std::string_view)>; using FuncGetUInt = std::function<size_t(std::string_view)>;

View File

@ -134,11 +134,12 @@ std::string KeyMetadata::getFileSegmentPath(const FileSegment & file_segment) co
/ CacheMetadata::getFileNameForFileSegment(file_segment.offset(), file_segment.getKind()); / CacheMetadata::getFileNameForFileSegment(file_segment.offset(), file_segment.getKind());
} }
CacheMetadata::CacheMetadata(const std::string & path_, size_t background_download_queue_size_limit_) CacheMetadata::CacheMetadata(const std::string & path_, size_t background_download_queue_size_limit_, size_t background_download_threads_)
: path(path_) : path(path_)
, cleanup_queue(std::make_shared<CleanupQueue>()) , cleanup_queue(std::make_shared<CleanupQueue>())
, download_queue(std::make_shared<DownloadQueue>(background_download_queue_size_limit_)) , download_queue(std::make_shared<DownloadQueue>(background_download_queue_size_limit_))
, log(&Poco::Logger::get("CacheMetadata")) , log(&Poco::Logger::get("CacheMetadata"))
, download_threads_num(background_download_threads_)
{ {
} }
@ -458,11 +459,6 @@ void CacheMetadata::cleanupThreadFunc()
} }
} }
void CacheMetadata::cancelCleanup()
{
cleanup_queue->cancel();
}
class DownloadQueue class DownloadQueue
{ {
friend struct CacheMetadata; friend struct CacheMetadata;
@ -473,7 +469,7 @@ public:
{ {
{ {
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
if (cancelled || (queue_size_limit && queue.size() == queue_size_limit)) if (cancelled || (queue_size_limit && queue.size() >= queue_size_limit))
return false; return false;
queue.push(DownloadInfo{file_segment->key(), file_segment->offset(), file_segment}); queue.push(DownloadInfo{file_segment->key(), file_segment->offset(), file_segment});
} }
@ -483,6 +479,8 @@ public:
return true; return true;
} }
bool setQueueLimit(size_t size) { return queue_size_limit.exchange(size) != size; }
private: private:
void cancel() void cancel()
{ {
@ -493,8 +491,8 @@ private:
cv.notify_all(); cv.notify_all();
} }
const size_t queue_size_limit; std::atomic<size_t> queue_size_limit;
std::mutex mutex; mutable std::mutex mutex;
std::condition_variable cv; std::condition_variable cv;
bool cancelled = false; bool cancelled = false;
@ -515,7 +513,7 @@ private:
std::queue<DownloadInfo> queue; std::queue<DownloadInfo> queue;
}; };
void CacheMetadata::downloadThreadFunc() void CacheMetadata::downloadThreadFunc(const bool & stop_flag)
{ {
std::optional<Memory<>> memory; std::optional<Memory<>> memory;
while (true) while (true)
@ -526,13 +524,13 @@ void CacheMetadata::downloadThreadFunc()
{ {
std::unique_lock lock(download_queue->mutex); std::unique_lock lock(download_queue->mutex);
if (download_queue->cancelled) if (download_queue->cancelled || stop_flag)
return; return;
if (download_queue->queue.empty()) if (download_queue->queue.empty())
{ {
download_queue->cv.wait(lock, [&](){ return download_queue->cancelled || !download_queue->queue.empty(); }); download_queue->cv.wait(lock, [&](){ return download_queue->cancelled || !download_queue->queue.empty() || stop_flag; });
if (download_queue->cancelled) if (download_queue->cancelled || stop_flag)
return; return;
} }
@ -607,6 +605,11 @@ void CacheMetadata::downloadThreadFunc()
} }
} }
bool CacheMetadata::setBackgroundDownloadQueueSizeLimit(size_t size)
{
return download_queue->setQueueLimit(size);
}
void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memory<>> & memory) void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memory<>> & memory)
{ {
LOG_TEST( LOG_TEST(
@ -670,9 +673,86 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
LOG_TEST(log, "Downloaded file segment: {}", file_segment.getInfoForLog()); LOG_TEST(log, "Downloaded file segment: {}", file_segment.getInfoForLog());
} }
void CacheMetadata::cancelDownload() void CacheMetadata::startup()
{
download_threads.reserve(download_threads_num);
for (size_t i = 0; i < download_threads_num; ++i)
{
download_threads.emplace_back(std::make_shared<DownloadThread>());
download_threads.back()->thread = std::make_unique<ThreadFromGlobalPool>([this, thread = download_threads.back()] { downloadThreadFunc(thread->stop_flag); });
}
cleanup_thread = std::make_unique<ThreadFromGlobalPool>(std::function{ [this]{ cleanupThreadFunc(); }});
}
void CacheMetadata::shutdown()
{ {
download_queue->cancel(); download_queue->cancel();
cleanup_queue->cancel();
for (auto & download_thread : download_threads)
{
if (download_thread->thread && download_thread->thread->joinable())
download_thread->thread->join();
}
if (cleanup_thread && cleanup_thread->joinable())
cleanup_thread->join();
}
bool CacheMetadata::isBackgroundDownloadEnabled()
{
return download_threads_num;
}
bool CacheMetadata::setBackgroundDownloadThreads(size_t threads_num)
{
if (threads_num == download_threads_num)
return false;
if (threads_num > download_threads_num)
{
SCOPE_EXIT({ download_threads_num = download_threads.size(); });
size_t add_threads = threads_num - download_threads_num;
for (size_t i = 0; i < add_threads; ++i)
{
download_threads.emplace_back(std::make_shared<DownloadThread>());
try
{
download_threads.back()->thread = std::make_unique<ThreadFromGlobalPool>(
[this, thread = download_threads.back()] { downloadThreadFunc(thread->stop_flag); });
}
catch (...)
{
download_threads.pop_back();
throw;
}
}
}
else if (threads_num < download_threads_num)
{
size_t remove_threads = download_threads_num - threads_num;
{
std::lock_guard lock(download_queue->mutex);
for (size_t i = 0; i < remove_threads; ++i)
download_threads[download_threads.size() - 1 - i]->stop_flag = true;
}
download_queue->cv.notify_all();
SCOPE_EXIT({ download_threads_num = download_threads.size(); });
for (size_t i = 0; i < remove_threads; ++i)
{
chassert(download_threads.back()->stop_flag);
auto & thread = download_threads.back()->thread;
if (thread && thread->joinable())
thread->join();
download_threads.pop_back();
}
}
return true;
} }
LockedKey::LockedKey(std::shared_ptr<KeyMetadata> key_metadata_) LockedKey::LockedKey(std::shared_ptr<KeyMetadata> key_metadata_)

View File

@ -5,6 +5,7 @@
#include <Interpreters/Cache/FileCacheKey.h> #include <Interpreters/Cache/FileCacheKey.h>
#include <Interpreters/Cache/FileSegment.h> #include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Cache/FileCache_fwd_internal.h> #include <Interpreters/Cache/FileCache_fwd_internal.h>
#include <Common/ThreadPool.h>
#include <shared_mutex> #include <shared_mutex>
namespace DB namespace DB
@ -102,7 +103,9 @@ public:
using Key = FileCacheKey; using Key = FileCacheKey;
using IterateFunc = std::function<void(LockedKey &)>; using IterateFunc = std::function<void(LockedKey &)>;
explicit CacheMetadata(const std::string & path_, size_t background_download_queue_size_limit_); explicit CacheMetadata(const std::string & path_, size_t background_download_queue_size_limit_, size_t background_download_threads_);
void startup();
const String & getBaseDirectory() const { return path; } const String & getBaseDirectory() const { return path; }
@ -138,21 +141,13 @@ public:
void removeKey(const Key & key, bool if_exists, bool if_releasable); void removeKey(const Key & key, bool if_exists, bool if_releasable);
void removeAllKeys(bool if_releasable); void removeAllKeys(bool if_releasable);
void cancelCleanup(); void shutdown();
/// Firstly, this cleanup does not delete cache files, bool setBackgroundDownloadThreads(size_t threads_num);
/// but only empty keys from cache_metadata_map and key (prefix) directories from fs. size_t getBackgroundDownloadThreads() const { return download_threads.size(); }
/// Secondly, it deletes those only if arose as a result of bool setBackgroundDownloadQueueSizeLimit(size_t size);
/// (1) eviction in FileCache::tryReserve();
/// (2) removal of cancelled non-downloaded file segments after FileSegment::complete().
/// which does not include removal of cache files because of FileCache::removeKey/removeAllKeys,
/// triggered by removal of source files from objects storage.
/// E.g. number of elements submitted to background cleanup should remain low.
void cleanupThreadFunc();
void downloadThreadFunc(); bool isBackgroundDownloadEnabled();
void cancelDownload();
private: private:
const std::string path; /// Cache base path const std::string path; /// Cache base path
@ -172,6 +167,16 @@ private:
static constexpr size_t buckets_num = 1024; static constexpr size_t buckets_num = 1024;
std::vector<MetadataBucket> metadata_buckets{buckets_num}; std::vector<MetadataBucket> metadata_buckets{buckets_num};
struct DownloadThread
{
std::unique_ptr<ThreadFromGlobalPool> thread;
bool stop_flag{false};
};
std::vector<std::shared_ptr<DownloadThread>> download_threads;
std::atomic<size_t> download_threads_num;
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;
MetadataBucket & getMetadataBucket(const Key & key); MetadataBucket & getMetadataBucket(const Key & key);
void downloadImpl(FileSegment & file_segment, std::optional<Memory<>> & memory); void downloadImpl(FileSegment & file_segment, std::optional<Memory<>> & memory);
MetadataBucket::iterator removeEmptyKey( MetadataBucket::iterator removeEmptyKey(
@ -179,6 +184,18 @@ private:
MetadataBucket::iterator it, MetadataBucket::iterator it,
LockedKey &, LockedKey &,
const CacheMetadataGuard::Lock &); const CacheMetadataGuard::Lock &);
void downloadThreadFunc(const bool & stop_flag);
/// Firstly, this cleanup does not delete cache files,
/// but only empty keys from cache_metadata_map and key (prefix) directories from fs.
/// Secondly, it deletes those only if arose as a result of
/// (1) eviction in FileCache::tryReserve();
/// (2) removal of cancelled non-downloaded file segments after FileSegment::complete().
/// which does not include removal of cache files because of FileCache::removeKey/removeAllKeys,
/// triggered by removal of source files from objects storage.
/// E.g. number of elements submitted to background cleanup should remain low.
void cleanupThreadFunc();
}; };

View File

@ -1094,7 +1094,7 @@ void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t
if (shared->root_temp_data_on_disk) if (shared->root_temp_data_on_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set"); throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set");
auto file_cache = FileCacheFactory::instance().getByName(disk_ptr->getCacheName()).cache; auto file_cache = FileCacheFactory::instance().getByName(disk_ptr->getCacheName())->cache;
if (!file_cache) if (!file_cache)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Cache '{}' is not found", disk_ptr->getCacheName()); throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Cache '{}' is not found", disk_ptr->getCacheName());

View File

@ -42,8 +42,8 @@ BlockIO InterpreterDescribeCacheQuery::execute()
MutableColumns res_columns = sample_block.cloneEmptyColumns(); MutableColumns res_columns = sample_block.cloneEmptyColumns();
auto cache_data = FileCacheFactory::instance().getByName(ast.cache_name); auto cache_data = FileCacheFactory::instance().getByName(ast.cache_name);
const auto & settings = cache_data.settings; auto settings = cache_data->getSettings();
const auto & cache = cache_data.cache; const auto & cache = cache_data->cache;
size_t i = 0; size_t i = 0;
res_columns[i++]->insert(settings.max_size); res_columns[i++]->insert(settings.max_size);

View File

@ -380,7 +380,7 @@ BlockIO InterpreterSystemQuery::execute()
} }
else else
{ {
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache; auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name)->cache;
if (query.key_to_drop.empty()) if (query.key_to_drop.empty())
{ {
cache->removeAllReleasable(); cache->removeAllReleasable();
@ -434,7 +434,7 @@ BlockIO InterpreterSystemQuery::execute()
} }
else else
{ {
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache; auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name)->cache;
auto file_segments = cache->sync(); auto file_segments = cache->sync();
fill_data(query.filesystem_cache_name, cache, file_segments); fill_data(query.filesystem_cache_name, cache, file_segments);
} }

View File

@ -335,7 +335,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
&Poco::Logger::get("checkDataPart"), &Poco::Logger::get("checkDataPart"),
"Will drop cache for data part {} and will check it once again", data_part->name); "Will drop cache for data part {} and will check it once again", data_part->name);
auto & cache = *FileCacheFactory::instance().getByName(*cache_name).cache; auto & cache = *FileCacheFactory::instance().getByName(*cache_name)->cache;
for (auto it = data_part_storage.iterate(); it->isValid(); it->next()) for (auto it = data_part_storage.iterate(); it->isValid(); it->next())
{ {
auto file_name = it->name(); auto file_name = it->name();

View File

@ -78,7 +78,7 @@ Pipe StorageSystemDisks::read(
String cache_path; String cache_path;
if (disk_ptr->supportsCache()) if (disk_ptr->supportsCache())
cache_path = FileCacheFactory::instance().getByName(disk_ptr->getCacheName()).settings.base_path; cache_path = FileCacheFactory::instance().getByName(disk_ptr->getCacheName())->getSettings().base_path;
col_cache_path->insert(cache_path); col_cache_path->insert(cache_path);
} }

View File

@ -63,7 +63,7 @@ Pipe StorageSystemRemoteDataPaths::read(
FileCachePtr cache; FileCachePtr cache;
if (disk->supportsCache()) if (disk->supportsCache())
cache = FileCacheFactory::instance().getByName(disk->getCacheName()).cache; cache = FileCacheFactory::instance().getByName(disk->getCacheName())->cache;
for (const auto & [local_path, storage_objects] : remote_paths_by_local_path) for (const auto & [local_path, storage_objects] : remote_paths_by_local_path)
{ {

View File

@ -19,6 +19,16 @@
<cache_policy>LRU</cache_policy> <cache_policy>LRU</cache_policy>
<slru_size_ratio>0.3</slru_size_ratio> <slru_size_ratio>0.3</slru_size_ratio>
</s3_cache> </s3_cache>
<s3_cache_02933>
<type>cache</type>
<disk>s3_disk</disk>
<path>s3_cache_02933/</path>
<max_size>128Mi</max_size>
<cache_on_write_operations>1</cache_on_write_operations>
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
<background_download_threads>0</background_download_threads>
<background_download_queue_size_limit>0</background_download_queue_size_limit>
</s3_cache_02933>
<!-- local disks --> <!-- local disks -->
<local_disk> <local_disk>
<type>local_blob_storage</type> <type>local_blob_storage</type>

View File

@ -0,0 +1,7 @@
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 0 0 1
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 10 1000 0 1
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 5 1000 0 1
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 15 1000 0 1
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 2 1000 0 1
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 1000 0 1
134217728 10000000 33554432 4194304 1 0 0 0 /var/lib/clickhouse/filesystem_caches/s3_cache_02933/ 0 0 0 1

View File

@ -0,0 +1,65 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-parallel, no-s3-storage
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
disk_name="s3_cache_02933"
$CLICKHOUSE_CLIENT --query "DESCRIBE FILESYSTEM CACHE '${disk_name}'"
config_path=/etc/clickhouse-server/config.d/storage_conf.xml
config_path_tmp=$config_path.tmp
cat $config_path \
| sed "s|<background_download_threads>0<\/background_download_threads>|<background_download_threads>10<\/background_download_threads>|" \
> $config_path_tmp
mv $config_path_tmp $config_path
cat $config_path \
| sed "s|<background_download_queue_size_limit>0<\/background_download_queue_size_limit>|<background_download_queue_size_limit>1000<\/background_download_queue_size_limit>|" \
> $config_path_tmp
mv $config_path_tmp $config_path
$CLICKHOUSE_CLIENT --query "SYSTEM RELOAD CONFIG"
$CLICKHOUSE_CLIENT --query "DESCRIBE FILESYSTEM CACHE '${disk_name}'"
cat $config_path \
| sed "s|<background_download_threads>10<\/background_download_threads>|<background_download_threads>5<\/background_download_threads>|" \
> $config_path_tmp
mv $config_path_tmp $config_path
$CLICKHOUSE_CLIENT --query "SYSTEM RELOAD CONFIG"
$CLICKHOUSE_CLIENT --query "DESCRIBE FILESYSTEM CACHE '${disk_name}'"
cat $config_path \
| sed "s|<background_download_threads>5<\/background_download_threads>|<background_download_threads>15<\/background_download_threads>|" \
> $config_path_tmp
mv $config_path_tmp $config_path
$CLICKHOUSE_CLIENT --query "SYSTEM RELOAD CONFIG"
$CLICKHOUSE_CLIENT --query "DESCRIBE FILESYSTEM CACHE '${disk_name}'"
cat $config_path \
| sed "s|<background_download_threads>15<\/background_download_threads>|<background_download_threads>2<\/background_download_threads>|" \
> $config_path_tmp
mv $config_path_tmp $config_path
$CLICKHOUSE_CLIENT --query "SYSTEM RELOAD CONFIG"
$CLICKHOUSE_CLIENT --query "DESCRIBE FILESYSTEM CACHE '${disk_name}'"
cat $config_path \
| sed "s|<background_download_threads>2<\/background_download_threads>|<background_download_threads>0<\/background_download_threads>|" \
> $config_path_tmp
mv $config_path_tmp $config_path
$CLICKHOUSE_CLIENT --query "SYSTEM RELOAD CONFIG"
$CLICKHOUSE_CLIENT --query "DESCRIBE FILESYSTEM CACHE '${disk_name}'"
cat $config_path \
| sed "s|<background_download_queue_size_limit>1000<\/background_download_queue_size_limit>|<background_download_queue_size_limit>0<\/background_download_queue_size_limit>|" \
> $config_path_tmp
mv $config_path_tmp $config_path
$CLICKHOUSE_CLIENT --query "SYSTEM RELOAD CONFIG"
$CLICKHOUSE_CLIENT --query "DESCRIBE FILESYSTEM CACHE '${disk_name}'"