Simplify FileCacheFactory

This commit is contained in:
kssenii 2023-04-15 13:08:49 +02:00
parent eafa3e8f64
commit 0517de58d1
25 changed files with 107 additions and 162 deletions

View File

@ -247,7 +247,7 @@ public:
/// Second bool param is a flag to remove (true) or keep (false) shared data on S3
virtual void removeSharedFileIfExists(const String & path, bool /* keep_shared_data */) { removeFileIfExists(path); }
virtual const String & getCacheBasePath() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There is no cache path"); }
virtual const String & getCacheName() const { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "There is no cache"); }
virtual bool supportsCache() const { return false; }

View File

@ -182,7 +182,7 @@ std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// N
auto implementation_buffer = object_storage->writeObject(object, mode, attributes, std::move(finalize_callback), buf_size, modified_write_settings);
bool cache_on_write = modified_write_settings.enable_filesystem_cache_on_write_operations
&& FileCacheFactory::instance().getSettings(cache->getBasePath()).cache_on_write_operations
&& FileCacheFactory::instance().getByName(cache_config_name).settings.cache_on_write_operations
&& fs::path(object.absolute_path).extension() != ".tmp";
auto path_key_for_cache = object.getPathKeyForCache();

View File

@ -87,7 +87,7 @@ public:
String getObjectsNamespace() const override;
const String & getCacheBasePath() const override { return cache->getBasePath(); }
const std::string & getCacheName() const override { return cache_config_name; }
std::string generateBlobNameForPath(const std::string & path) override;
@ -113,8 +113,6 @@ public:
WriteSettings getAdjustedSettingsFromMetadataFile(const WriteSettings & settings, const std::string & path) const override;
FileCachePtr getCache() const { return cache; }
private:
FileCache::Key getCacheKey(const std::string & path) const;

View File

@ -40,13 +40,11 @@ void registerDiskCache(DiskFactory & factory, bool /* global_skip_access_check *
FileCacheSettings file_cache_settings;
file_cache_settings.loadFromConfig(config, config_prefix);
auto cache_base_path = config.getString(config_prefix + ".path", fs::path(context->getPath()) / "disks" / name / "cache/");
if (!fs::exists(cache_base_path))
fs::create_directories(cache_base_path);
if (file_cache_settings.base_path.empty())
file_cache_settings.base_path = fs::path(context->getPath()) / "disks" / name / "cache/";
auto cache = FileCacheFactory::instance().getOrCreate(name, file_cache_settings);
auto disk = disk_it->second;
auto cache = FileCacheFactory::instance().getOrCreate(cache_base_path, file_cache_settings, name);
auto disk_object_storage = disk->createDiskObjectStorage();
disk_object_storage->wrapWithCache(cache, file_cache_settings, name);

View File

@ -534,14 +534,6 @@ void DiskObjectStorage::wrapWithCache(FileCachePtr cache, const FileCacheSetting
object_storage = std::make_shared<CachedObjectStorage>(object_storage, cache, cache_settings, layer_name);
}
FileCachePtr DiskObjectStorage::getCache() const
{
const auto * cached_object_storage = typeid_cast<CachedObjectStorage *>(object_storage.get());
if (!cached_object_storage)
return nullptr;
return cached_object_storage->getCache();
}
NameSet DiskObjectStorage::getCacheLayersNames() const
{
NameSet cache_layers;

View File

@ -51,10 +51,7 @@ public:
void getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithObjectStoragePaths> & paths_map) override;
const std::string & getCacheBasePath() const override
{
return object_storage->getCacheBasePath();
}
const std::string & getCacheName() const override { return object_storage->getCacheName(); }
UInt64 getTotalSpace() const override { return std::numeric_limits<UInt64>::max(); }
@ -192,7 +189,6 @@ public:
/// There can be any number of cache layers:
/// DiskObjectStorage(CachedObjectStorage(...CacheObjectStorage(S3ObjectStorage)...))
void wrapWithCache(FileCachePtr cache, const FileCacheSettings & cache_settings, const String & layer_name);
FileCachePtr getCache() const;
/// Get structure of object storage this disk works with. Examples:
/// DiskObjectStorage(S3ObjectStorage)

View File

@ -58,9 +58,9 @@ void IObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
out->finalize();
}
const std::string & IObjectStorage::getCacheBasePath() const
const std::string & IObjectStorage::getCacheName() const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getCacheBasePath() is not implemented for object storage");
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getCacheName() is not implemented for object storage");
}
void IObjectStorage::applyRemoteThrottlingSettings(ContextPtr context)

View File

@ -155,8 +155,7 @@ public:
virtual ~IObjectStorage() = default;
/// Path to directory with objects cache
virtual const std::string & getCacheBasePath() const;
virtual const std::string & getCacheName() const;
static IAsynchronousReader & getThreadPoolReader();

View File

@ -23,22 +23,20 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
FileCache::FileCache(
const String & cache_base_path_,
const FileCacheSettings & cache_settings_)
: cache_base_path(cache_base_path_)
, max_size(cache_settings_.max_size)
, max_element_size(cache_settings_.max_elements)
, max_file_segment_size(cache_settings_.max_file_segment_size)
, allow_persistent_files(cache_settings_.do_not_evict_index_and_mark_files)
, enable_cache_hits_threshold(cache_settings_.enable_cache_hits_threshold)
, enable_filesystem_query_cache_limit(cache_settings_.enable_filesystem_query_cache_limit)
, enable_bypass_cache_with_threashold(cache_settings_.enable_bypass_cache_with_threashold)
, bypass_cache_threashold(cache_settings_.bypass_cache_threashold)
FileCache::FileCache(const FileCacheSettings & settings)
: cache_base_path(settings.base_path)
, max_size(settings.max_size)
, max_element_size(settings.max_elements)
, max_file_segment_size(settings.max_file_segment_size)
, allow_persistent_files(settings.do_not_evict_index_and_mark_files)
, enable_cache_hits_threshold(settings.enable_cache_hits_threshold)
, enable_filesystem_query_cache_limit(settings.enable_filesystem_query_cache_limit)
, enable_bypass_cache_with_threashold(settings.enable_bypass_cache_with_threashold)
, bypass_cache_threashold(settings.bypass_cache_threashold)
, log(&Poco::Logger::get("FileCache"))
, main_priority(std::make_unique<LRUFileCachePriority>())
, stash_priority(std::make_unique<LRUFileCachePriority>())
, max_stash_element_size(cache_settings_.max_elements)
, max_stash_element_size(settings.max_elements)
{
}

View File

@ -39,7 +39,7 @@ using QueryContextPtr = std::shared_ptr<QueryContext>;
public:
using Key = DB::FileCacheKey;
FileCache(const String & cache_base_path_, const FileCacheSettings & cache_settings_);
explicit FileCache(const FileCacheSettings & settings);
~FileCache() = default;

View File

@ -15,76 +15,37 @@ FileCacheFactory & FileCacheFactory::instance()
return ret;
}
FileCacheFactory::CacheByBasePath FileCacheFactory::getAll()
{
std::lock_guard lock(mutex);
return caches_by_path;
}
const FileCacheSettings & FileCacheFactory::getSettings(const std::string & cache_base_path)
{
std::lock_guard lock(mutex);
auto it = caches_by_path.find(cache_base_path);
if (it == caches_by_path.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No cache found by path: {}", cache_base_path);
return it->second->settings;
}
FileCachePtr FileCacheFactory::tryGet(const std::string & cache_base_path)
{
std::lock_guard lock(mutex);
auto it = caches_by_path.find(cache_base_path);
if (it == caches_by_path.end())
return nullptr;
return it->second->cache;
}
FileCachePtr FileCacheFactory::get(const std::string & cache_base_path)
{
auto file_cache_ptr = tryGet(cache_base_path);
if (!file_cache_ptr)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No cache found by path: {}", cache_base_path);
return file_cache_ptr;
}
FileCachePtr FileCacheFactory::getOrCreate(
const std::string & cache_base_path, const FileCacheSettings & file_cache_settings, const std::string & name)
{
std::lock_guard lock(mutex);
auto it = caches_by_path.find(cache_base_path);
if (it != caches_by_path.end())
{
caches_by_name.emplace(name, it->second);
return it->second->cache;
}
auto cache = std::make_shared<FileCache>(cache_base_path, file_cache_settings);
FileCacheData result{cache, file_cache_settings};
auto cache_it = caches.insert(caches.end(), std::move(result));
caches_by_name.emplace(name, cache_it);
caches_by_path.emplace(cache_base_path, cache_it);
return cache;
}
FileCacheFactory::FileCacheData FileCacheFactory::getByName(const std::string & name)
{
std::lock_guard lock(mutex);
auto it = caches_by_name.find(name);
if (it == caches_by_name.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No cache found by name: {}", name);
return *it->second;
}
FileCacheFactory::CacheByName FileCacheFactory::getAllByName()
FileCacheFactory::CacheByName FileCacheFactory::getAll()
{
std::lock_guard lock(mutex);
return caches_by_name;
}
FileCachePtr FileCacheFactory::getOrCreate(
const std::string & cache_name, const FileCacheSettings & file_cache_settings)
{
std::lock_guard lock(mutex);
auto it = caches_by_name.find(cache_name);
if (it == caches_by_name.end())
{
auto cache = std::make_shared<FileCache>(file_cache_settings);
it = caches_by_name.emplace(
cache_name, std::make_unique<FileCacheData>(cache, file_cache_settings)).first;
}
return it->second->cache;
}
FileCacheFactory::FileCacheData FileCacheFactory::getByName(const std::string & cache_name)
{
std::lock_guard lock(mutex);
auto it = caches_by_name.find(cache_name);
if (it == caches_by_name.end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no cache by name: {}", cache_name);
return *it->second;
}
}

View File

@ -22,33 +22,22 @@ public:
FileCachePtr cache;
FileCacheSettings settings;
FileCacheData() = default;
FileCacheData(FileCachePtr cache_, const FileCacheSettings & settings_) : cache(cache_), settings(settings_) {}
};
using Caches = std::list<FileCacheData>;
using CacheByBasePath = std::unordered_map<std::string, Caches::iterator>;
using CacheByName = std::unordered_map<std::string, Caches::iterator>;
using FileCacheDataPtr = std::shared_ptr<FileCacheData>;
using CacheByName = std::unordered_map<std::string, FileCacheDataPtr>;
static FileCacheFactory & instance();
FileCachePtr getOrCreate(const std::string & cache_base_path, const FileCacheSettings & file_cache_settings, const std::string & name);
FileCachePtr getOrCreate(const std::string & cache_name, const FileCacheSettings & file_cache_settings);
FileCachePtr tryGet(const std::string & cache_base_path);
FileCachePtr get(const std::string & cache_base_path);
CacheByName getAll();
CacheByBasePath getAll();
const FileCacheSettings & getSettings(const std::string & cache_base_path);
FileCacheData getByName(const std::string & name);
CacheByName getAllByName();
FileCacheData getByName(const std::string & cache_name);
private:
std::mutex mutex;
Caches caches;
CacheByBasePath caches_by_path;
CacheByName caches_by_name;
};

View File

@ -14,6 +14,11 @@ namespace ErrorCodes
void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
if (!config.has(config_prefix + ".path"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected cache path (`path`) in configuration");
base_path = config.getString(config_prefix + ".path");
if (!config.has(config_prefix + ".max_size"))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected cache size (`max_size`) in configuration");

View File

@ -1,6 +1,7 @@
#pragma once
#include <Interpreters/Cache/FileCache_fwd.h>
#include <string>
namespace Poco { namespace Util { class AbstractConfiguration; } } // NOLINT(cppcoreguidelines-virtual-class-destructor)
@ -9,6 +10,8 @@ namespace DB
struct FileCacheSettings
{
std::string base_path;
size_t max_size = 0;
size_t max_elements = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS;
size_t max_file_segment_size = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE;

View File

@ -927,11 +927,7 @@ void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t
if (shared->root_temp_data_on_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set");
const auto * disk_object_storage_ptr = dynamic_cast<const DiskObjectStorage *>(disk_ptr.get());
if (!disk_object_storage_ptr)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Disk '{}' does not use cache", cache_disk_name);
auto file_cache = disk_object_storage_ptr->getCache();
auto file_cache = FileCacheFactory::instance().getByName(disk_ptr->getCacheName()).cache;
if (!file_cache)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Cache '{}' is not found", file_cache->getBasePath());

View File

@ -177,7 +177,7 @@ BlockIO InterpreterShowTablesQuery::execute()
Block sample_block{ColumnWithTypeAndName(std::make_shared<DataTypeString>(), "Caches")};
MutableColumns res_columns = sample_block.cloneEmptyColumns();
auto caches = FileCacheFactory::instance().getAllByName();
auto caches = FileCacheFactory::instance().getAll();
for (const auto & [name, _] : caches)
res_columns[0]->insert(name);
BlockIO res;

View File

@ -359,7 +359,8 @@ BlockIO InterpreterSystemQuery::execute()
case Type::DROP_FILESYSTEM_CACHE:
{
getContext()->checkAccess(AccessType::SYSTEM_DROP_FILESYSTEM_CACHE);
if (query.filesystem_cache_path.empty())
if (query.filesystem_cache_name.empty())
{
auto caches = FileCacheFactory::instance().getAll();
for (const auto & [_, cache_data] : caches)
@ -367,7 +368,7 @@ BlockIO InterpreterSystemQuery::execute()
}
else
{
auto cache = FileCacheFactory::instance().get(query.filesystem_cache_path);
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache;
cache->removeIfReleasable();
}
break;

View File

@ -135,11 +135,12 @@ TEST_F(FileCacheTest, get)
DB::CurrentThread::QueryScope query_scope_holder(query_context);
DB::FileCacheSettings settings;
settings.base_path = cache_base_path;
settings.max_size = 30;
settings.max_elements = 5;
{
auto cache = DB::FileCache(cache_base_path, settings);
auto cache = DB::FileCache(settings);
cache.initialize();
auto key = cache.hash("key1");
@ -516,7 +517,7 @@ TEST_F(FileCacheTest, get)
{
/// Test LRUCache::restore().
auto cache2 = DB::FileCache(cache_base_path, settings);
auto cache2 = DB::FileCache(settings);
cache2.initialize();
auto key = cache2.hash("key1");
@ -537,7 +538,8 @@ TEST_F(FileCacheTest, get)
auto settings2 = settings;
settings2.max_file_segment_size = 10;
auto cache2 = DB::FileCache(caches_dir / "cache2", settings2);
settings2.base_path = caches_dir / "cache2";
auto cache2 = DB::FileCache(settings2);
cache2.initialize();
auto key = cache2.hash("key1");
@ -558,8 +560,9 @@ TEST_F(FileCacheTest, writeBuffer)
settings.max_size = 100;
settings.max_elements = 5;
settings.max_file_segment_size = 5;
settings.base_path = cache_base_path;
DB::FileCache cache(cache_base_path, settings);
DB::FileCache cache(settings);
cache.initialize();
auto write_to_cache = [&cache](const String & key, const Strings & data, bool flush)
@ -658,8 +661,9 @@ TEST_F(FileCacheTest, temporaryData)
DB::FileCacheSettings settings;
settings.max_size = 10_KiB;
settings.max_file_segment_size = 1_KiB;
settings.base_path = cache_base_path;
DB::FileCache file_cache(cache_base_path, settings);
DB::FileCache file_cache(settings);
file_cache.initialize();
auto tmp_data_scope = std::make_shared<TemporaryDataOnDiskScope>(nullptr, &file_cache, 0);

View File

@ -203,8 +203,8 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
}
else if (type == Type::DROP_FILESYSTEM_CACHE)
{
if (!filesystem_cache_path.empty())
settings.ostr << (settings.hilite ? hilite_none : "") << " " << filesystem_cache_path;
if (!filesystem_cache_name.empty())
settings.ostr << (settings.hilite ? hilite_none : "") << " " << filesystem_cache_name;
}
else if (type == Type::UNFREEZE)
{

View File

@ -103,7 +103,7 @@ public:
String disk;
UInt64 seconds{};
String filesystem_cache_path;
String filesystem_cache_name;
String backup_name;

View File

@ -387,7 +387,7 @@ bool ParserSystemQuery::parseImpl(IParser::Pos & pos, ASTPtr & node, Expected &
ParserLiteral path_parser;
ASTPtr ast;
if (path_parser.parse(pos, ast, expected))
res->filesystem_cache_path = ast->as<ASTLiteral>()->value.safeGet<String>();
res->filesystem_cache_name = ast->as<ASTLiteral>()->value.safeGet<String>();
if (!parseQueryWithOnCluster(res, pos, expected))
return false;
break;

View File

@ -2,6 +2,7 @@
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cache/FileCacheFactory.h>
namespace DB
{
@ -77,7 +78,7 @@ Pipe StorageSystemDisks::read(
String cache_path;
if (disk_ptr->supportsCache())
cache_path = disk_ptr->getCacheBasePath();
cache_path = FileCacheFactory::instance().getByName(disk_ptr->getCacheName()).settings.base_path;
col_cache_path->insert(cache_path);
}

View File

@ -15,6 +15,7 @@ namespace DB
NamesAndTypesList StorageSystemFilesystemCache::getNamesAndTypes()
{
return {
{"cache_name", std::make_shared<DataTypeString>()},
{"cache_base_path", std::make_shared<DataTypeString>()},
{"cache_path", std::make_shared<DataTypeString>()},
{"file_segment_range_begin", std::make_shared<DataTypeUInt64>()},
@ -39,31 +40,32 @@ void StorageSystemFilesystemCache::fillData(MutableColumns & res_columns, Contex
{
auto caches = FileCacheFactory::instance().getAll();
for (const auto & [cache_base_path, cache_data] : caches)
for (const auto & [cache_name, cache_data] : caches)
{
const auto & cache = cache_data->cache;
auto file_segments = cache->getSnapshot();
for (const auto & file_segment : file_segments)
{
res_columns[0]->insert(cache_base_path);
res_columns[0]->insert(cache_name);
res_columns[1]->insert(cache->getBasePath());
/// Do not use `file_segment->getPathInLocalCache` here because it will lead to nullptr dereference
/// (because file_segments in getSnapshot doesn't have `cache` field set)
res_columns[1]->insert(
res_columns[2]->insert(
cache->getPathInLocalCache(file_segment->key(), file_segment->offset(), file_segment->getKind()));
const auto & range = file_segment->range();
res_columns[2]->insert(range.left);
res_columns[3]->insert(range.right);
res_columns[4]->insert(range.size());
res_columns[5]->insert(FileSegment::stateToString(file_segment->state()));
res_columns[6]->insert(file_segment->getHitsCount());
res_columns[7]->insert(file_segment->getRefCount());
res_columns[8]->insert(file_segment->getDownloadedSize());
res_columns[9]->insert(file_segment->isPersistent());
res_columns[10]->insert(toString(file_segment->getKind()));
res_columns[11]->insert(file_segment->isUnbound());
res_columns[3]->insert(range.left);
res_columns[4]->insert(range.right);
res_columns[5]->insert(range.size());
res_columns[6]->insert(FileSegment::stateToString(file_segment->state()));
res_columns[7]->insert(file_segment->getHitsCount());
res_columns[8]->insert(file_segment->getRefCount());
res_columns[9]->insert(file_segment->getDownloadedSize());
res_columns[10]->insert(file_segment->isPersistent());
res_columns[11]->insert(toString(file_segment->getKind()));
res_columns[12]->insert(file_segment->isUnbound());
}
}
}

View File

@ -61,10 +61,9 @@ Pipe StorageSystemRemoteDataPaths::read(
disk->getRemotePathsRecursive("data", remote_paths_by_local_path);
FileCachePtr cache;
auto cache_base_path = disk->supportsCache() ? disk->getCacheBasePath() : "";
if (!cache_base_path.empty())
cache = FileCacheFactory::instance().get(cache_base_path);
if (disk->supportsCache())
cache = FileCacheFactory::instance().getByName(disk->getCacheName()).cache;
for (const auto & [local_path, common_prefox_for_objects, storage_objects] : remote_paths_by_local_path)
{
@ -72,7 +71,10 @@ Pipe StorageSystemRemoteDataPaths::read(
{
col_disk_name->insert(disk_name);
col_base_path->insert(disk->getPath());
col_cache_base_path->insert(cache_base_path);
if (cache)
col_cache_base_path->insert(cache->getBasePath());
else
col_cache_base_path->insertDefault();
col_local_path->insert(local_path);
col_remote_path->insert(object.absolute_path);
col_size->insert(object.bytes_size);

View File

@ -31,7 +31,7 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
$CLICKHOUSE_CLIENT --query "SELECT * FROM test_02286 FORMAT Null"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --multiline --multiquery --query "SYSTEM DROP FILESYSTEM CACHE './data'; --{serverError 36}"
$CLICKHOUSE_CLIENT --multiline --multiquery --query "SYSTEM DROP FILESYSTEM CACHE 'ff'; --{serverError 36}"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
@ -77,7 +77,7 @@ for STORAGE_POLICY in 's3_cache' 'local_cache'; do
$CLICKHOUSE_CLIENT --query "SELECT * FROM test_022862 FORMAT Null"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "SYSTEM DROP FILESYSTEM CACHE '${STORAGE_POLICY}_2/'"
$CLICKHOUSE_CLIENT --query "SYSTEM DROP FILESYSTEM CACHE '${STORAGE_POLICY}_2'"
$CLICKHOUSE_CLIENT --query "SELECT count() FROM system.filesystem_cache"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS test_022862"