This commit is contained in:
kssenii 2023-12-07 12:07:01 +01:00
parent f44f7c8c28
commit 614da21144
13 changed files with 132 additions and 66 deletions

View File

@ -98,7 +98,7 @@ std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// N
auto implementation_buffer = object_storage->writeObject(object, mode, attributes, buf_size, modified_write_settings);
bool cache_on_write = modified_write_settings.enable_filesystem_cache_on_write_operations
&& FileCacheFactory::instance().getByName(cache_config_name).settings.cache_on_write_operations
&& FileCacheFactory::instance().getByName(cache_config_name)->getSettings().cache_on_write_operations
&& fs::path(object.remote_path).extension() != ".tmp";
/// Need to remove even if cache_on_write == false.

View File

@ -53,13 +53,15 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & settings_)
: settings(settings_)
FileCache::FileCache(const std::string & cache_name, const FileCacheSettings & settings)
: max_file_segment_size(settings.max_file_segment_size)
, bypass_cache_threshold(settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0)
, boundary_alignment(settings.boundary_alignment)
, background_download_threads(settings.background_download_threads)
, load_metadata_threads(settings.load_metadata_threads)
, log(&Poco::Logger::get("FileCache(" + cache_name + ")"))
, metadata(settings.base_path, settings.background_download_queue_size_limit)
{
settings.bypass_cache_threshold = settings.enable_bypass_cache_with_threshold ? settings.bypass_cache_threshold : 0;
main_priority = std::make_unique<LRUFileCachePriority>(settings.max_size, settings.max_elements);
if (settings.cache_hits_threshold)
@ -131,12 +133,12 @@ void FileCache::initialize()
throw;
}
is_initialized = true;
for (size_t i = 0; i < settings.background_download_threads; ++i)
for (size_t i = 0; i < background_download_threads; ++i)
download_threads.emplace_back([this] { metadata.downloadThreadFunc(); });
cleanup_thread = std::make_unique<ThreadFromGlobalPool>(std::function{ [this]{ metadata.cleanupThreadFunc(); }});
is_initialized = true;
}
CacheGuard::Lock FileCache::lockCache() const
@ -150,7 +152,7 @@ FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment:
/// Given range = [left, right] and non-overlapping ordered set of file segments,
/// find list [segment1, ..., segmentN] of segments which intersect with given range.
if (settings.bypass_cache_threshold && range.size() > settings.bypass_cache_threshold)
if (bypass_cache_threshold && range.size() > bypass_cache_threshold)
{
auto file_segment = std::make_shared<FileSegment>(
locked_key.getKey(), range.left, range.size(), FileSegment::State::DETACHED);
@ -255,7 +257,7 @@ std::vector<FileSegment::Range> FileCache::splitRange(size_t offset, size_t size
FileSegments file_segments;
while (current_pos < end_pos_non_included)
{
auto current_file_segment_size = std::min(remaining_size, settings.max_file_segment_size);
auto current_file_segment_size = std::min(remaining_size, max_file_segment_size);
ranges.emplace_back(current_pos, current_pos + current_file_segment_size - 1);
remaining_size -= current_file_segment_size;
@ -284,7 +286,7 @@ FileSegments FileCache::splitRangeIntoFileSegments(
FileSegments file_segments;
while (current_pos < end_pos_non_included && (!file_segments_limit || file_segments.size() < file_segments_limit))
{
current_file_segment_size = std::min(remaining_size, settings.max_file_segment_size);
current_file_segment_size = std::min(remaining_size, max_file_segment_size);
remaining_size -= current_file_segment_size;
auto file_segment_metadata_it = addFileSegment(
@ -487,8 +489,8 @@ FileCache::getOrSet(
FileSegment::Range range(offset, offset + size - 1);
const auto aligned_offset = roundDownToMultiple(range.left, settings.boundary_alignment);
auto aligned_end_offset = std::min(roundUpToMultiple(offset + size, settings.boundary_alignment), file_size) - 1;
const auto aligned_offset = roundDownToMultiple(range.left, boundary_alignment);
auto aligned_end_offset = std::min(roundUpToMultiple(offset + size, boundary_alignment), file_size) - 1;
chassert(aligned_offset <= range.left);
chassert(aligned_end_offset >= range.right);
@ -701,7 +703,7 @@ KeyMetadata::iterator FileCache::addFileSegment(
result_state = state;
}
auto file_segment = std::make_shared<FileSegment>(key, offset, size, result_state, create_settings, settings.background_download_threads > 0, this, locked_key.getKeyMetadata());
auto file_segment = std::make_shared<FileSegment>(key, offset, size, result_state, create_settings, background_download_threads > 0, this, locked_key.getKeyMetadata());
auto file_segment_metadata = std::make_shared<FileSegmentMetadata>(std::move(file_segment));
auto [file_segment_metadata_it, inserted] = locked_key.emplace(offset, file_segment_metadata);
@ -1049,9 +1051,9 @@ void FileCache::loadMetadataImpl()
std::mutex set_exception_mutex;
std::atomic<bool> stop_loading = false;
LOG_INFO(log, "Loading filesystem cache with {} threads", settings.load_metadata_threads);
LOG_INFO(log, "Loading filesystem cache with {} threads", load_metadata_threads);
for (size_t i = 0; i < settings.load_metadata_threads; ++i)
for (size_t i = 0; i < load_metadata_threads; ++i)
{
try
{
@ -1352,34 +1354,47 @@ void FileCache::assertCacheCorrectness()
});
}
FileCacheSettings FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings)
void FileCache::applySettingsIfPossible(const FileCacheSettings & new_settings, FileCacheSettings & actual_settings)
{
if (!is_initialized)
return settings;
if (!is_initialized || shutdown)
return;
auto lock = lockCache();
if (shutdown)
return settings;
if (settings.background_download_queue_size_limit != new_settings.background_download_queue_size_limit)
size_t add_download_threads = 0;
{
LOG_DEBUG(log, "Changing background_download_queue_size_limit from {} to {}",
settings.background_download_queue_size_limit, new_settings.background_download_queue_size_limit);
std::lock_guard lock(apply_settings_mutex);
metadata.setBackgroundDownloadQueueSizeLimit(new_settings.background_download_queue_size_limit);
settings.background_download_queue_size_limit = new_settings.background_download_queue_size_limit;
if (new_settings == actual_settings)
return;
size_t background_download_queue_size_limit = metadata.getBackgroundDownloadQueueSizeLimit();
if (background_download_queue_size_limit != new_settings.background_download_queue_size_limit)
{
LOG_DEBUG(log, "Changing background_download_queue_size_limit from {} to {}",
background_download_queue_size_limit, new_settings.background_download_queue_size_limit);
metadata.setBackgroundDownloadQueueSizeLimit(new_settings.background_download_queue_size_limit);
actual_settings.background_download_queue_size_limit = new_settings.background_download_queue_size_limit;
}
if (background_download_threads < new_settings.background_download_threads)
{
LOG_DEBUG(log, "Changing background_download_threads from {} to {}",
background_download_threads, new_settings.background_download_threads);
add_download_threads = new_settings.background_download_threads - background_download_threads;
background_download_threads = actual_settings.background_download_threads = new_settings.background_download_threads;
}
}
if (settings.background_download_threads < new_settings.background_download_threads)
{
LOG_DEBUG(log, "Changing background_download_threads from {} to {}", settings.background_download_threads, new_settings.background_download_threads);
size_t threads_to_add = new_settings.background_download_threads - settings.background_download_threads;
for (size_t i = 0; i < threads_to_add; ++i)
if (add_download_threads)
{
auto lock = lockCache();
if (shutdown)
return;
for (size_t i = 0; i < add_download_threads; ++i)
download_threads.emplace_back([this] { metadata.downloadThreadFunc(); });
settings.background_download_threads = new_settings.background_download_threads;
}
return settings;
}
FileCache::QueryContextHolder::QueryContextHolder(

View File

@ -56,7 +56,7 @@ public:
using PriorityIterator = IFileCachePriority::Iterator;
using PriorityIterationResult = IFileCachePriority::IterationResult;
FileCache(const std::string & cache_name, const FileCacheSettings & settings_);
FileCache(const std::string & cache_name, const FileCacheSettings & settings);
~FileCache();
@ -123,7 +123,7 @@ public:
size_t getFileSegmentsNum() const;
size_t getMaxFileSegmentSize() const { return settings.max_file_segment_size; }
size_t getMaxFileSegmentSize() const { return max_file_segment_size; }
bool tryReserve(FileSegment & file_segment, size_t size, FileCacheReserveStat & stat);
@ -155,12 +155,16 @@ public:
FileSegments sync();
FileCacheSettings applySettingsIfPossible(const FileCacheSettings & settings);
void applySettingsIfPossible(const FileCacheSettings & new_settings, FileCacheSettings & actual_settings);
private:
using KeyAndOffset = FileCacheKeyAndOffset;
FileCacheSettings settings;
const size_t max_file_segment_size;
const size_t bypass_cache_threshold;
const size_t boundary_alignment;
size_t background_download_threads; /// 0 means background download is disabled.
size_t load_metadata_threads;
Poco::Logger * log;
@ -168,7 +172,9 @@ private:
std::atomic<bool> is_initialized = false;
mutable std::mutex init_mutex;
std::unique_ptr<StatusFile> status_file;
size_t shutdown = false;
std::atomic<bool> shutdown = false;
std::mutex apply_settings_mutex;
CacheMetadata metadata;

View File

@ -9,6 +9,22 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
FileCacheFactory::FileCacheData::FileCacheData(
FileCachePtr cache_,
const FileCacheSettings & settings_,
const std::string & config_path_)
: cache(cache_)
, config_path(config_path_)
, settings(settings_)
{
}
FileCacheSettings FileCacheFactory::FileCacheData::getSettings() const
{
std::lock_guard lock(settings_mutex);
return settings;
}
FileCacheFactory & FileCacheFactory::instance()
{
static FileCacheFactory ret;
@ -39,7 +55,7 @@ FileCachePtr FileCacheFactory::getOrCreate(
return it->second->cache;
}
FileCacheFactory::FileCacheData FileCacheFactory::getByName(const std::string & cache_name)
FileCacheFactory::FileCacheDataPtr FileCacheFactory::getByName(const std::string & cache_name)
{
std::lock_guard lock(mutex);
@ -47,7 +63,7 @@ FileCacheFactory::FileCacheData FileCacheFactory::getByName(const std::string &
if (it == caches_by_name.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no cache by name: {}", cache_name);
return *it->second;
return it->second;
}
void FileCacheFactory::updateSettingsFromConfig(const Poco::Util::AbstractConfiguration & config)
@ -63,13 +79,24 @@ void FileCacheFactory::updateSettingsFromConfig(const Poco::Util::AbstractConfig
if (cache_info->config_path.empty())
continue;
FileCacheSettings settings;
settings.loadFromConfig(config, cache_info->config_path);
FileCacheSettings new_settings;
new_settings.loadFromConfig(config, cache_info->config_path);
if (settings == cache_info->settings)
continue;
FileCacheSettings old_settings;
{
std::lock_guard lock(cache_info->settings_mutex);
if (new_settings == cache_info->settings)
continue;
cache_info->settings = cache_info->cache->applySettingsIfPossible(settings);
old_settings = cache_info->settings;
}
cache_info->cache->applySettingsIfPossible(new_settings, old_settings);
{
std::lock_guard lock(cache_info->settings_mutex);
cache_info->settings = old_settings;
}
}
}

View File

@ -16,16 +16,22 @@ namespace DB
class FileCacheFactory final : private boost::noncopyable
{
public:
struct FileCacheData
class FileCacheData
{
FileCachePtr cache;
FileCacheSettings settings;
std::string config_path;
friend class FileCacheFactory;
public:
FileCacheData(FileCachePtr cache_, const FileCacheSettings & settings_, const std::string & config_path_);
FileCacheSettings getSettings() const;
const FileCachePtr cache;
const std::string config_path;
private:
FileCacheSettings settings;
mutable std::mutex settings_mutex;
};
FileCacheData() = default;
FileCacheData(FileCachePtr cache_, const FileCacheSettings & settings_, const std::string & config_path_)
: cache(cache_), settings(settings_), config_path(config_path_) {}
};
using FileCacheDataPtr = std::shared_ptr<FileCacheData>;
using CacheByName = std::unordered_map<std::string, FileCacheDataPtr>;
@ -38,7 +44,7 @@ public:
CacheByName getAll();
FileCacheData getByName(const std::string & cache_name);
FileCacheDataPtr getByName(const std::string & cache_name);
void updateSettingsFromConfig(const Poco::Util::AbstractConfiguration & config);

View File

@ -483,6 +483,12 @@ public:
return true;
}
size_t getQueueLimit() const
{
std::lock_guard lock(mutex);
return queue_size_limit;
}
void setQueueLimit(size_t size)
{
std::lock_guard lock(mutex);
@ -500,7 +506,7 @@ private:
}
size_t queue_size_limit;
std::mutex mutex;
mutable std::mutex mutex;
std::condition_variable cv;
bool cancelled = false;
@ -613,6 +619,11 @@ void CacheMetadata::downloadThreadFunc()
}
}
size_t CacheMetadata::getBackgroundDownloadQueueSizeLimit() const
{
return download_queue->getQueueLimit();
}
void CacheMetadata::setBackgroundDownloadQueueSizeLimit(size_t size)
{
download_queue->setQueueLimit(size);

View File

@ -154,6 +154,7 @@ public:
void cancelDownload();
size_t getBackgroundDownloadQueueSizeLimit() const;
void setBackgroundDownloadQueueSizeLimit(size_t size);
private:

View File

@ -1065,7 +1065,7 @@ void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t
if (shared->root_temp_data_on_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set");
auto file_cache = FileCacheFactory::instance().getByName(disk_ptr->getCacheName()).cache;
auto file_cache = FileCacheFactory::instance().getByName(disk_ptr->getCacheName())->cache;
if (!file_cache)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Cache '{}' is not found", disk_ptr->getCacheName());

View File

@ -42,8 +42,8 @@ BlockIO InterpreterDescribeCacheQuery::execute()
MutableColumns res_columns = sample_block.cloneEmptyColumns();
auto cache_data = FileCacheFactory::instance().getByName(ast.cache_name);
const auto & settings = cache_data.settings;
const auto & cache = cache_data.cache;
auto settings = cache_data->getSettings();
const auto & cache = cache_data->cache;
size_t i = 0;
res_columns[i++]->insert(settings.max_size);

View File

@ -378,7 +378,7 @@ BlockIO InterpreterSystemQuery::execute()
}
else
{
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache;
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name)->cache;
if (query.key_to_drop.empty())
{
cache->removeAllReleasable();
@ -432,7 +432,7 @@ BlockIO InterpreterSystemQuery::execute()
}
else
{
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache;
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name)->cache;
auto file_segments = cache->sync();
fill_data(query.filesystem_cache_name, cache, file_segments);
}

View File

@ -335,7 +335,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
&Poco::Logger::get("checkDataPart"),
"Will drop cache for data part {} and will check it once again", data_part->name);
auto & cache = *FileCacheFactory::instance().getByName(*cache_name).cache;
auto & cache = *FileCacheFactory::instance().getByName(*cache_name)->cache;
for (auto it = data_part_storage.iterate(); it->isValid(); it->next())
{
auto file_name = it->name();

View File

@ -78,7 +78,7 @@ Pipe StorageSystemDisks::read(
String cache_path;
if (disk_ptr->supportsCache())
cache_path = FileCacheFactory::instance().getByName(disk_ptr->getCacheName()).settings.base_path;
cache_path = FileCacheFactory::instance().getByName(disk_ptr->getCacheName())->getSettings().base_path;
col_cache_path->insert(cache_path);
}

View File

@ -63,7 +63,7 @@ Pipe StorageSystemRemoteDataPaths::read(
FileCachePtr cache;
if (disk->supportsCache())
cache = FileCacheFactory::instance().getByName(disk->getCacheName()).cache;
cache = FileCacheFactory::instance().getByName(disk->getCacheName())->cache;
for (const auto & [local_path, storage_objects] : remote_paths_by_local_path)
{