This commit is contained in:
kssenii 2022-07-17 14:11:13 +02:00
parent ca09c6c2c0
commit 7866f7b96c
7 changed files with 56 additions and 16 deletions

View File

@ -99,8 +99,8 @@ class IDisk : public Space
{
public:
/// Default constructor.
explicit IDisk(std::unique_ptr<Executor> executor_ = std::make_unique<SyncExecutor>())
: executor(std::move(executor_))
explicit IDisk(std::shared_ptr<Executor> executor_ = std::make_shared<SyncExecutor>())
: executor(executor_)
{
}
@ -366,7 +366,7 @@ protected:
void copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir = true);
private:
std::unique_ptr<Executor> executor;
std::shared_ptr<Executor> executor;
};
using DiskPtr = std::shared_ptr<IDisk>;

View File

@ -16,7 +16,7 @@ class CachedObjectStorage : public IObjectStorage
public:
CachedObjectStorage(ObjectStoragePtr object_storage_, FileCachePtr cache_, const String & cache_config_name_);
std::string getName() const override { return "CachedObjectStorage(" + object_storage->getName() + ")"; }
std::string getName() const override { return fmt::format("CachedObjectStorage-{}({})", cache_config_name, object_storage->getName()); }
bool exists(const StoredObject & object) const override;

View File

@ -32,10 +32,9 @@ void registerDiskCache(DiskFactory & factory)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"There is not disk with name `{}`, disk name should be initialized before cache disk",
disk_name);
"Cannot wrap disk `{}` with cache layer `{}`: there is no such disk (it should be initialized before cache disk)",
disk_name, name);
}
/// TODO: Add a check that there underlying storage is unique among cached storages.
FileCacheSettings file_cache_settings;
file_cache_settings.loadFromConfig(config, config_prefix);
@ -53,8 +52,8 @@ void registerDiskCache(DiskFactory & factory)
LOG_TEST(
&Poco::Logger::get("DiskCache"),
"Registered disk cached disk with structure: {}",
assert_cast<DiskObjectStorage *>(disk_object_storage.get())->getStructure());
"Registered disk cached disk (`{}`) with structure: {}",
name, assert_cast<DiskObjectStorage *>(disk_object_storage.get())->getStructure());
return disk_object_storage;
};

View File

@ -93,6 +93,12 @@ DiskTransactionPtr DiskObjectStorage::createObjectStorageTransaction()
send_metadata ? metadata_helper.get() : nullptr);
}
std::shared_ptr<Executor> DiskObjectStorage::getAsyncExecutor(const std::string & log_name, size_t size)
{
static auto reader = std::make_shared<AsyncThreadPoolExecutor>(log_name, size);
return reader;
}
DiskObjectStorage::DiskObjectStorage(
const String & name_,
const String & object_storage_root_path_,
@ -102,7 +108,7 @@ DiskObjectStorage::DiskObjectStorage(
DiskType disk_type_,
bool send_metadata_,
uint64_t thread_pool_size_)
: IDisk(std::make_unique<AsyncThreadPoolExecutor>(log_name, thread_pool_size_))
: IDisk(getAsyncExecutor(log_name, thread_pool_size_))
, name(name_)
, object_storage_root_path(object_storage_root_path_)
, log (&Poco::Logger::get("DiskObjectStorage(" + log_name + ")"))
@ -447,7 +453,15 @@ bool DiskObjectStorage::isReadOnly() const
DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
{
return std::static_pointer_cast<DiskObjectStorage>(shared_from_this());
return std::make_shared<DiskObjectStorage>(
getName(),
object_storage_root_path,
getName(),
metadata_storage,
object_storage,
disk_type,
send_metadata,
threadpool_size);
}
void DiskObjectStorage::wrapWithCache(FileCachePtr cache, const String & layer_name)
@ -584,5 +598,4 @@ DiskObjectStorageReservation::~DiskObjectStorageReservation()
}
}
}

View File

@ -183,11 +183,13 @@ public:
/// DiskObjectStorage(S3ObjectStorage)
/// DiskObjectStorage(CachedObjectStorage(S3ObjectStorage))
/// DiskObjectStorage(CachedObjectStorage(CachedObjectStorage(S3ObjectStorage)))
String getStructure() const { return "DiskObjectStorage(" + object_storage->getName() + ")"; }
String getStructure() const { return fmt::format("DiskObjectStorage-{}({})", getName(), object_storage->getName()); }
/// Get names of all cache layers. Name is how cache is defined in configuration file.
NameSet getCacheLayersNames() const override;
static std::shared_ptr<Executor> getAsyncExecutor(const std::string & log_name, size_t size);
private:
/// Create actual disk object storage transaction for operations

View File

@ -58,8 +58,23 @@ bool MetadataStorageFromStaticFilesWebServer::exists(const std::string & path) c
void MetadataStorageFromStaticFilesWebServer::assertExists(const std::string & path) const
{
initializeIfNeeded(path);
if (!exists(path))
#ifdef NDEBUG
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "There is no path {}", path);
#else
{
std::string all_files;
for (const auto & [file, _] : object_storage.files)
{
if (!all_files.empty())
all_files += ", ";
all_files += file;
}
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "There is no path {} (available files: {})", path, all_files);
}
#endif
}
bool MetadataStorageFromStaticFilesWebServer::isFile(const std::string & path) const
@ -97,9 +112,8 @@ std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(
return result;
}
DirectoryIteratorPtr MetadataStorageFromStaticFilesWebServer::iterateDirectory(const std::string & path) const
bool MetadataStorageFromStaticFilesWebServer::initializeIfNeeded(const std::string & path) const
{
std::vector<fs::path> dir_file_paths;
if (object_storage.files.find(path) == object_storage.files.end())
{
try
@ -114,10 +128,20 @@ DirectoryIteratorPtr MetadataStorageFromStaticFilesWebServer::iterateDirectory(c
throw Exception(ErrorCodes::NETWORK_ERROR, "Cannot load disk metadata. Error: {}", message);
LOG_TRACE(&Poco::Logger::get("DiskWeb"), "Cannot load disk metadata. Error: {}", message);
return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths));
return false;
}
}
return true;
}
DirectoryIteratorPtr MetadataStorageFromStaticFilesWebServer::iterateDirectory(const std::string & path) const
{
std::vector<fs::path> dir_file_paths;
if (!initializeIfNeeded(path))
return std::make_unique<DiskWebServerDirectoryIterator>(std::move(dir_file_paths));
assertExists(path);
for (const auto & [file_path, _] : object_storage.files)

View File

@ -18,6 +18,8 @@ private:
void assertExists(const std::string & path) const;
bool initializeIfNeeded(const std::string & path) const;
public:
explicit MetadataStorageFromStaticFilesWebServer(const WebObjectStorage & object_storage_);