Adjustments for object storages

This commit is contained in:
kssenii 2022-07-04 20:09:57 +02:00
parent be671ad36d
commit ca53350f02
33 changed files with 502 additions and 410 deletions

View File

@ -81,8 +81,8 @@ public:
bool isCached() const override { return delegate->isCached(); }
DiskObjectStoragePtr getObjectStorage(const String &) override;
const std::unordered_set<String> & getCacheLayersNames() const override { return delegate->getCacheLayersNames(); }
String getCacheBasePath() const override { return delegate->getCacheBasePath(); }
PathsWithSize getObjectStoragePaths(const String & path) const override { return delegate->getObjectStoragePaths(path); }
const String & getCacheBasePath() const override { return delegate->getCacheBasePath(); }
StoredObjects getStorageObjects(const String & path) const override { return delegate->getStorageObjects(path); }
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map) override { return delegate->getRemotePathsRecursive(path, paths_map); }
MetadataStoragePtr getMetadataStorage() override { return delegate->getMetadataStorage(); }

View File

@ -600,8 +600,9 @@ DiskObjectStoragePtr DiskLocal::getObjectStorage(const String & name_)
{
auto object_storage = std::make_shared<LocalObjectStorage>();
auto metadata_storage = std::make_shared<MetadataStorageFromLocalDisk>(
std::static_pointer_cast<DiskLocal>(shared_from_this()),
object_storage);
/* metadata_storage */std::static_pointer_cast<DiskLocal>(shared_from_this()),
object_storage,
/* object_storage_root_path */getPath());
return std::make_shared<DiskObjectStorage>(
name_,

View File

@ -318,10 +318,10 @@ const String & DiskRestartProxy::getCacheBasePath() const
return DiskDecorator::getCacheBasePath();
}
PathsWithSize DiskRestartProxy::getObjectStoragePaths(const String & path) const
StoredObjects DiskRestartProxy::getStorageObjects(const String & path) const
{
ReadLock lock (mutex);
return DiskDecorator::getObjectStoragePaths(path);
return DiskDecorator::getStorageObjects(path);
}
void DiskRestartProxy::getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map)

View File

@ -65,7 +65,7 @@ public:
String getUniqueId(const String & path) const override;
bool checkUniqueId(const String & id) const override;
const String & getCacheBasePath() const override;
PathsWithSize getObjectStoragePaths(const String & path) const override;
StoredObjects getStorageObjects(const String & path) const override;
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map) override;
void restart(ContextPtr context);

View File

@ -170,10 +170,10 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
auto remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
remote_path = remote_path.string().substr(url.size());
PathsWithSize blobs_to_read;
blobs_to_read.emplace_back(remote_path, iter->second.size);
StoredObjects objects;
objects.emplace_back(remote_path, iter->second.size);
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(url, blobs_to_read, getContext(), read_settings);
auto web_impl = std::make_unique<ReadBufferFromWebServerGather>(url, objects, getContext(), read_settings);
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{

View File

@ -174,7 +174,7 @@ public:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk {} is read-only", getName());
}
PathsWithSize getObjectStoragePaths(const String &) const override { return {}; }
StoredObjects getStorageObjects(const String &) const override { return {}; }
void getRemotePathsRecursive(const String &, std::vector<LocalPathWithRemotePaths> &) override {}

View File

@ -130,7 +130,7 @@ MetadataStoragePtr IDisk::getMetadataStorage()
else
{
auto object_storage = std::make_shared<LocalObjectStorage>();
return std::make_shared<MetadataStorageFromLocalDisk>(std::static_pointer_cast<IDisk>(shared_from_this()), object_storage);
return std::make_shared<MetadataStorageFromLocalDisk>(std::static_pointer_cast<IDisk>(shared_from_this()), object_storage, getPath());
}
}

View File

@ -234,13 +234,13 @@ public:
/// Returns a list of paths because for Log family engines there might be
/// multiple files in remote fs for single clickhouse file.
virtual PathsWithSize getObjectStoragePaths(const String &) const
virtual StoredObjects getStorageObjects(const String &) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getObjectStoragePaths() not implemented for disk: {}`", getType());
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method `getStorageObjects() not implemented for disk: {}`", getType());
}
/// For one local path there might be multiple remote paths in case of Log family engines.
using LocalPathWithRemotePaths = std::pair<String, PathsWithSize>;
using LocalPathWithRemotePaths = std::pair<String, StoredObjects>;
virtual void getRemotePathsRecursive(const String &, std::vector<LocalPathWithRemotePaths> &)
{

View File

@ -127,7 +127,7 @@ SeekableReadBufferPtr ReadBufferFromHDFSGather::createImplementationBufferImpl(c
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
const PathsWithSize & blobs_to_read_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_)
: ReadBuffer(nullptr, 0)
, blobs_to_read(blobs_to_read_)
@ -136,6 +136,9 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
, enable_cache_log(!query_id.empty() && settings.enable_filesystem_cache_log)
{
if (blobs_to_read.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read zero number of objects");
with_cache = settings.remote_fs_cache
&& settings.enable_filesystem_cache
&& (!IFileCache::isReadOnly() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
@ -188,7 +191,7 @@ void ReadBufferFromRemoteFSGather::initialize()
auto current_buf_offset = file_offset_of_buffer_end;
for (size_t i = 0; i < blobs_to_read.size(); ++i)
{
const auto & [file_path, size] = blobs_to_read[i];
const auto & [file_path, size, _] = blobs_to_read[i];
if (size > current_buf_offset)
{
@ -223,10 +226,14 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
return true;
}
else
{
return false;
}
if (!moveToNextBuffer())
{
return false;
}
return readImpl();
}
@ -240,7 +247,7 @@ bool ReadBufferFromRemoteFSGather::moveToNextBuffer()
++current_buf_idx;
const auto & [path, size] = blobs_to_read[current_buf_idx];
const auto & [path, size, _] = blobs_to_read[current_buf_idx];
current_buf = createImplementationBuffer(path, size);
return true;
@ -325,7 +332,7 @@ size_t ReadBufferFromRemoteFSGather::getFileSize() const
{
size_t size = 0;
for (const auto & object : blobs_to_read)
size += object.size;
size += object.bytes_size;
return size;
}

View File

@ -27,7 +27,7 @@ friend class ReadIndirectBufferFromRemoteFS;
public:
ReadBufferFromRemoteFSGather(
const PathsWithSize & blobs_to_read_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_);
~ReadBufferFromRemoteFSGather() override;
@ -53,7 +53,7 @@ public:
protected:
virtual SeekableReadBufferPtr createImplementationBufferImpl(const String & path, size_t file_size) = 0;
PathsWithSize blobs_to_read;
StoredObjects blobs_to_read;
ReadSettings settings;
@ -109,7 +109,7 @@ public:
std::shared_ptr<const Aws::S3::S3Client> client_ptr_,
const String & bucket_,
const String & version_id_,
const PathsWithSize & blobs_to_read_,
const StoredObjects & blobs_to_read_,
size_t max_single_read_retries_,
const ReadSettings & settings_)
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
@ -138,7 +138,7 @@ class ReadBufferFromAzureBlobStorageGather final : public ReadBufferFromRemoteFS
public:
ReadBufferFromAzureBlobStorageGather(
std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient> blob_container_client_,
const PathsWithSize & blobs_to_read_,
const StoredObjects & blobs_to_read_,
size_t max_single_read_retries_,
size_t max_single_download_retries_,
const ReadSettings & settings_)
@ -164,7 +164,7 @@ class ReadBufferFromWebServerGather final : public ReadBufferFromRemoteFSGather
public:
ReadBufferFromWebServerGather(
const String & uri_,
const PathsWithSize & blobs_to_read_,
const StoredObjects & blobs_to_read_,
ContextPtr context_,
const ReadSettings & settings_)
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
@ -188,11 +188,10 @@ class ReadBufferFromHDFSGather final : public ReadBufferFromRemoteFSGather
public:
ReadBufferFromHDFSGather(
const Poco::Util::AbstractConfiguration & config_,
const PathsWithSize & blobs_to_read_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_)
: ReadBufferFromRemoteFSGather(blobs_to_read_, settings_)
, config(config_)
, hdfs_uri(hdfs_uri_)
{
}

View File

@ -37,13 +37,13 @@ std::string AzureObjectStorage::generateBlobNameForPath(const std::string & /* p
return getRandomASCIIString();
}
bool AzureObjectStorage::exists(const std::string & uri) const
bool AzureObjectStorage::exists(const StoredObject & object) const
{
auto client_ptr = client.get();
/// What a shame, no Exists method...
Azure::Storage::Blobs::ListBlobsOptions options;
options.Prefix = uri;
options.Prefix = object.path;
options.PageSizeHint = 1;
auto blobs_list_response = client_ptr->ListBlobs(options);
@ -51,7 +51,7 @@ bool AzureObjectStorage::exists(const std::string & uri) const
for (const auto & blob : blobs_list)
{
if (uri == blob.Name)
if (object.path == blob.Name)
return true;
}
@ -59,7 +59,7 @@ bool AzureObjectStorage::exists(const std::string & uri) const
}
std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObject( /// NOLINT
const std::string & path,
const StoredObject & object,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
@ -67,12 +67,12 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObject( /// NOLI
auto settings_ptr = settings.get();
return std::make_unique<ReadBufferFromAzureBlobStorage>(
client.get(), path, read_settings, settings_ptr->max_single_read_retries,
client.get(), object.path, read_settings, settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries, read_settings.remote_fs_buffer_size);
}
std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOLINT
const PathsWithSize & paths_to_read,
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
@ -80,7 +80,7 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
auto settings_ptr = settings.get();
auto reader_impl = std::make_unique<ReadBufferFromAzureBlobStorageGather>(
client.get(),
paths_to_read,
objects,
settings_ptr->max_single_read_retries,
settings_ptr->max_single_download_retries,
read_settings);
@ -99,7 +99,7 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NOLINT
const std::string & path,
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes>,
FinalizeCallback && finalize_callback,
@ -111,11 +111,11 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
auto buffer = std::make_unique<WriteBufferFromAzureBlobStorage>(
client.get(),
path,
object.path,
settings.get()->max_single_part_upload_size,
buf_size);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(finalize_callback), path);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(finalize_callback), object.path);
}
void AzureObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
@ -133,36 +133,37 @@ void AzureObjectStorage::listPrefix(const std::string & path, RelativePathsWithS
}
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void AzureObjectStorage::removeObject(const std::string & path)
void AzureObjectStorage::removeObject(const StoredObject & object)
{
const auto & path = object.path;
auto client_ptr = client.get();
auto delete_info = client_ptr->DeleteBlob(path);
if (!delete_info.Value.Deleted)
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", path);
}
void AzureObjectStorage::removeObjects(const PathsWithSize & paths)
void AzureObjectStorage::removeObjects(const StoredObjects & objects)
{
auto client_ptr = client.get();
for (const auto & [path, _] : paths)
for (const auto & object : objects)
{
auto delete_info = client_ptr->DeleteBlob(path);
auto delete_info = client_ptr->DeleteBlob(object.path);
if (!delete_info.Value.Deleted)
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", path);
throw Exception(ErrorCodes::AZURE_BLOB_STORAGE_ERROR, "Failed to delete file in AzureBlob Storage: {}", object.path);
}
}
void AzureObjectStorage::removeObjectIfExists(const std::string & path)
void AzureObjectStorage::removeObjectIfExists(const StoredObject & object)
{
auto client_ptr = client.get();
auto delete_info = client_ptr->DeleteBlob(path);
auto delete_info = client_ptr->DeleteBlob(object.path);
}
void AzureObjectStorage::removeObjectsIfExist(const PathsWithSize & paths)
void AzureObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
{
auto client_ptr = client.get();
for (const auto & [path, _] : paths)
auto delete_info = client_ptr->DeleteBlob(path);
for (const auto & object : objects)
auto delete_info = client_ptr->DeleteBlob(object.path);
}
@ -184,13 +185,14 @@ ObjectMetadata AzureObjectStorage::getObjectMetadata(const std::string & path) c
}
void AzureObjectStorage::copyObject( /// NOLINT
const std::string & object_from,
const std::string & object_to,
const StoredObject & object_from,
const StoredObject & object_to,
std::optional<ObjectAttributes> object_to_attributes)
{
auto client_ptr = client.get();
auto dest_blob_client = client_ptr->GetBlobClient(object_to);
auto source_blob_client = client_ptr->GetBlobClient(object_from);
auto dest_blob_client = client_ptr->GetBlobClient(object_to.path);
auto source_blob_client = client_ptr->GetBlobClient(object_from.path);
Azure::Storage::Blobs::CopyBlobFromUriOptions copy_options;
if (object_to_attributes.has_value())
{

View File

@ -49,23 +49,23 @@ public:
AzureClientPtr && client_,
SettingsPtr && settings_);
bool exists(const std::string & uri) const override;
bool exists(const StoredObject & object) const override;
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
const std::string & path,
const StoredObject & object,
const ReadSettings & read_settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const PathsWithSize & blobs_to_read,
const StoredObjects & objects,
const ReadSettings & read_settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const std::string & path,
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
@ -75,19 +75,19 @@ public:
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const std::string & path) override;
void removeObject(const StoredObject & object) override;
void removeObjects(const PathsWithSize & paths) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const std::string & path) override;
void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const PathsWithSize & paths) override;
void removeObjectsIfExist(const StoredObjects & objects) override;
ObjectMetadata getObjectMetadata(const std::string & path) const override;
void copyObject( /// NOLINT
const std::string & object_from,
const std::string & object_to,
const StoredObject & object_from,
const StoredObject & object_to,
std::optional<ObjectAttributes> object_to_attributes = {}) override;
void shutdown() override {}

View File

@ -61,24 +61,24 @@ void CachedObjectStorage::startup()
object_storage->startup();
}
bool CachedObjectStorage::exists(const std::string & path) const
bool CachedObjectStorage::exists(const StoredObject & object) const
{
fs::path cache_path = getCachePath(path);
fs::path cache_path = getCachePath(object.getCacheHint());
if (fs::exists(cache_path) && !cache_path.empty())
return true;
return object_storage->exists(path);
return object_storage->exists(object);
}
std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObjects( /// NOLINT
const PathsWithSize & paths_to_read,
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
auto modified_read_settings = getReadSettingsForCache(read_settings);
auto impl = object_storage->readObjects(paths_to_read, modified_read_settings, read_hint, file_size);
auto impl = object_storage->readObjects(objects, modified_read_settings, read_hint, file_size);
/// If underlying read buffer does caching on its own, do not wrap it in caching buffer.
if (impl->isIntegratedWithFilesystemCache()
@ -94,15 +94,15 @@ std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObjects( /// NO
auto implementation_buffer_creator = [=, this]()
{
auto implementation_buffer =
object_storage->readObjects(paths_to_read, modified_read_settings, read_hint, file_size);
object_storage->readObjects(objects, modified_read_settings, read_hint, file_size);
return std::make_unique<BoundedReadBuffer>(std::move(implementation_buffer));
};
if (paths_to_read.size() != 1)
if (objects.size() != 1)
throw Exception(ErrorCodes::CANNOT_USE_CACHE, "Unable to read multiple objects, support not added");
std::string path = paths_to_read[0].path;
IFileCache::Key key = getCacheKey(path);
std::string path = objects[0].path;
IFileCache::Key key = getCacheKey(objects[0].getCacheHint());
return std::make_unique<CachedReadBufferFromFile>(
path,
@ -118,13 +118,13 @@ std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObjects( /// NO
}
std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObject( /// NOLINT
const std::string & path,
const StoredObject & object,
const ReadSettings & read_settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
auto modified_read_settings = getReadSettingsForCache(read_settings);
auto impl = object_storage->readObject(path, read_settings, read_hint, file_size);
auto impl = object_storage->readObject(object, read_settings, read_hint, file_size);
/// If underlying read buffer does caching on its own, do not wrap it in caching buffer.
if (impl->isIntegratedWithFilesystemCache()
@ -140,14 +140,14 @@ std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObject( /// NOL
auto implementation_buffer_creator = [=, this]()
{
auto implementation_buffer =
object_storage->readObject(path, read_settings, read_hint, file_size);
object_storage->readObject(object, read_settings, read_hint, file_size);
return std::make_unique<BoundedReadBuffer>(std::move(implementation_buffer));
};
IFileCache::Key key = getCacheKey(path);
LOG_TEST(log, "Reading from file `{}` with cache key `{}`", path, getCacheKey(path).toString());
IFileCache::Key key = getCacheKey(object.getCacheHint());
LOG_TEST(log, "Reading from file `{}` with cache key `{}`", object.path, key.toString());
return std::make_unique<CachedReadBufferFromFile>(
path,
object.path,
key,
cache,
implementation_buffer_creator,
@ -161,23 +161,27 @@ std::unique_ptr<ReadBufferFromFileBase> CachedObjectStorage::readObject( /// NOL
std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// NOLINT
const std::string & path,
const StoredObject & object,
WriteMode mode, // Cached doesn't support append, only rewrite
std::optional<ObjectAttributes> attributes,
FinalizeCallback && finalize_callback,
size_t buf_size,
const WriteSettings & write_settings)
{
auto impl = object_storage->writeObject(path, mode, attributes, std::move(finalize_callback), buf_size, write_settings);
auto impl = object_storage->writeObject(object, mode, attributes, std::move(finalize_callback), buf_size, write_settings);
bool cache_on_write = fs::path(path).extension() != ".tmp"
bool cache_on_write = fs::path(object.path).extension() != ".tmp"
&& write_settings.enable_filesystem_cache_on_write_operations
&& FileCacheFactory::instance().getSettings(cache->getBasePath()).cache_on_write_operations;
auto cache_hint = object.getCacheHint();
removeCacheIfExists(cache_hint);
if (cache_on_write)
{
auto key = getCacheKey(path);
LOG_TEST(log, "Caching file `{}` to `{}` with key {}", path, getCachePath(path), key.toString());
auto key = getCacheKey(cache_hint);
LOG_TEST(log, "Caching file `{}` to `{}` with key {}", object.path, getCachePath(cache_hint), key.toString());
return std::make_unique<CachedWriteBufferFromFile>(
std::move(impl),
cache,
@ -191,12 +195,12 @@ std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// N
return impl;
}
void CachedObjectStorage::removeCacheIfExists(const std::string & path)
void CachedObjectStorage::removeCacheIfExists(const std::string & cache_hint)
{
IFileCache::Key key;
try
{
key = getCacheKey(path);
key = getCacheKey(cache_hint);
}
catch (Exception & e)
{
@ -207,48 +211,46 @@ void CachedObjectStorage::removeCacheIfExists(const std::string & path)
cache->removeIfExists(key);
}
void CachedObjectStorage::removeObject(const std::string & path)
void CachedObjectStorage::removeObject(const StoredObject & object)
{
removeCacheIfExists(path);
object_storage->removeObject(path);
removeCacheIfExists(object.getCacheHint());
object_storage->removeObject(object);
}
void CachedObjectStorage::removeObjects(const PathsWithSize & paths)
void CachedObjectStorage::removeObjects(const StoredObjects & objects)
{
for (const auto & [path, _] : paths)
removeCacheIfExists(path);
for (const auto & object : objects)
removeCacheIfExists(object.getCacheHint());
object_storage->removeObjects(paths);
object_storage->removeObjects(objects);
}
void CachedObjectStorage::removeObjectIfExists(const std::string & path)
void CachedObjectStorage::removeObjectIfExists(const StoredObject & object)
{
removeCacheIfExists(path);
object_storage->removeObjectIfExists(path);
removeCacheIfExists(object.getCacheHint());
object_storage->removeObjectIfExists(object);
}
void CachedObjectStorage::removeObjectsIfExist(const PathsWithSize & paths)
void CachedObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
{
for (const auto & [path, _] : paths)
removeCacheIfExists(path);
for (const auto & object : objects)
removeCacheIfExists(object.getCacheHint());
object_storage->removeObjectsIfExist(paths);
object_storage->removeObjectsIfExist(objects);
}
void CachedObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
const std::string & object_from,
const std::string & object_to,
const StoredObject & object_from,
const StoredObject & object_to,
IObjectStorage & object_storage_to,
std::optional<ObjectAttributes> object_to_attributes)
{
/// TODO: add something here?
object_storage->copyObjectToAnotherObjectStorage(object_from, object_to, object_storage_to, object_to_attributes);
}
void CachedObjectStorage::copyObject( // NOLINT
const std::string & object_from, const std::string & object_to, std::optional<ObjectAttributes> object_to_attributes)
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> object_to_attributes)
{
/// TODO: add something here?
object_storage->copyObject(object_from, object_to, object_to_attributes);
}
@ -258,11 +260,10 @@ std::unique_ptr<IObjectStorage> CachedObjectStorage::cloneObjectStorage(
const std::string & config_prefix,
ContextPtr context)
{
/// TODO: add something here?
return object_storage->cloneObjectStorage(new_namespace, config, config_prefix, context);
}
void CachedObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const
void CachedObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
{
object_storage->listPrefix(path, children);
}

View File

@ -16,45 +16,45 @@ class CachedObjectStorage : public IObjectStorage
public:
CachedObjectStorage(ObjectStoragePtr object_storage_, FileCachePtr cache_);
bool exists(const std::string & path) const override;
bool exists(const StoredObject & object) const override;
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
const std::string & path,
const StoredObject & object,
const ReadSettings & read_settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const PathsWithSize & paths_to_read,
const StoredObjects & objects,
const ReadSettings & read_settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const std::string & path,
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void removeObject(const std::string & path) override;
void removeObject(const StoredObject & object) override;
void removeObjects(const PathsWithSize & paths) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const std::string & path) override;
void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const PathsWithSize & paths) override;
void removeObjectsIfExist(const StoredObjects & objects) override;
void copyObject( /// NOLINT
const std::string & object_from,
const std::string & object_to,
const StoredObject & object_from,
const StoredObject & object_to,
std::optional<ObjectAttributes> object_to_attributes = {}) override;
void copyObjectToAnotherObjectStorage( /// NOLINT
const std::string & object_from,
const std::string & object_to,
const StoredObject & object_from,
const StoredObject & object_to,
IObjectStorage & object_storage_to,
std::optional<ObjectAttributes> object_to_attributes = {}) override;
@ -64,7 +64,7 @@ public:
const std::string & config_prefix,
ContextPtr context) override;
void listPrefix(const std::string & path, PathsWithSize & children) const override;
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
ObjectMetadata getObjectMetadata(const std::string & path) const override;
@ -85,7 +85,7 @@ public:
bool isRemote() const override { return object_storage->isRemote(); }
void removeCacheIfExists(const std::string & path) override;
void removeCacheIfExists(const std::string & path_hint_for_cache) override;
private:
IFileCache::Key getCacheKey(const std::string & path) const;

View File

@ -85,7 +85,6 @@ DiskTransactionPtr DiskObjectStorage::createTransaction()
return std::make_shared<DiskObjectStorageTransaction>(
*object_storage,
*metadata_storage,
object_storage_root_path,
send_metadata ? metadata_helper.get() : nullptr);
}
@ -101,7 +100,7 @@ DiskObjectStorage::DiskObjectStorage(
: IDisk(std::make_unique<AsyncThreadPoolExecutor>(log_name, thread_pool_size))
, name(name_)
, object_storage_root_path(object_storage_root_path_)
, log (&Poco::Logger::get(log_name))
, log (&Poco::Logger::get("DiskObjectStorage(" + log_name + ")"))
, disk_type(disk_type_)
, metadata_storage(std::move(metadata_storage_))
, object_storage(std::move(object_storage_))
@ -109,9 +108,9 @@ DiskObjectStorage::DiskObjectStorage(
, metadata_helper(std::make_unique<DiskObjectStorageRemoteMetadataRestoreHelper>(this, ReadSettings{}))
{}
PathsWithSize DiskObjectStorage::getObjectStoragePaths(const String & local_path) const
StoredObjects DiskObjectStorage::getStorageObjects(const String & local_path) const
{
return metadata_storage->getObjectStoragePaths(local_path);
return metadata_storage->getStorageObjects(local_path);
}
void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map)
@ -121,7 +120,7 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::
{
try
{
paths_map.emplace_back(local_path, getObjectStoragePaths(local_path));
paths_map.emplace_back(local_path, getStorageObjects(local_path));
}
catch (const Exception & e)
{
@ -245,7 +244,7 @@ String DiskObjectStorage::getUniqueId(const String & path) const
{
LOG_TRACE(log, "Remote path: {}, Path: {}", object_storage_root_path, path);
String id;
auto blobs_paths = metadata_storage->getObjectStoragePaths(path);
auto blobs_paths = metadata_storage->getStorageObjects(path);
if (!blobs_paths.empty())
id = blobs_paths[0].path;
return id;
@ -256,7 +255,7 @@ bool DiskObjectStorage::checkObjectExists(const String & path) const
if (!path.starts_with(object_storage_root_path))
return false;
return object_storage->exists(path);
return object_storage->exists(StoredObject{path, 0});
}
bool DiskObjectStorage::checkUniqueId(const String & id) const
@ -471,8 +470,13 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
auto objects = metadata_storage->getStorageObjects(path);
String r;
for (const auto & object : objects)
r += object.path + ", ";
LOG_TEST(log, "Read: {}, objects: {} ({})", path, objects.size(), r);
return object_storage->readObjects(
metadata_storage->getObjectStoragePaths(path),
metadata_storage->getStorageObjects(path),
updateSettingsForReadWrite(path, settings),
read_hint,
file_size);
@ -484,6 +488,8 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorage::writeFile(
WriteMode mode,
const WriteSettings & settings)
{
LOG_TEST(log, "Write file: {}", path);
auto transaction = createTransaction();
auto result = transaction->writeFile(
path,

View File

@ -50,7 +50,7 @@ public:
const String & getPath() const override { return metadata_storage->getPath(); }
PathsWithSize getObjectStoragePaths(const String & local_path) const override;
StoredObjects getStorageObjects(const String & local_path) const override;
void getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map) override;

View File

@ -53,7 +53,7 @@ void DiskObjectStorageMetadata::deserialize(ReadBuffer & buf)
object_relative_path = object_relative_path.substr(object_storage_root_path.size());
}
assertChar('\n', buf);
storage_objects[i].path = object_relative_path;
storage_objects[i].relative_path = object_relative_path;
storage_objects[i].bytes_size = object_size;
}
@ -122,7 +122,7 @@ DiskObjectStorageMetadata::DiskObjectStorageMetadata(
void DiskObjectStorageMetadata::addObject(const String & path, size_t size)
{
if (!remote_fs_root_path.empty() && path.starts_with(remote_fs_root_path))
if (!object_storage_root_path.empty() && path.starts_with(object_storage_root_path))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected relative path");
total_size += size;

View File

@ -24,10 +24,12 @@ static String revisionToString(UInt64 revision)
return std::bitset<64>(revision).to_string();
}
void DiskObjectStorageRemoteMetadataRestoreHelper::createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const
void DiskObjectStorageRemoteMetadataRestoreHelper::createFileOperationObject(
const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const
{
const String path = disk->object_storage_root_path + "operations/r" + revisionToString(revision) + operation_log_suffix + "-" + operation_name;
auto buf = disk->object_storage->writeObject(path, WriteMode::Rewrite, metadata);
const String relative_path = "operations/r" + revisionToString(revision) + operation_log_suffix + "-" + operation_name;
auto object = disk->metadata_storage->createStorageObject(relative_path);
auto buf = disk->object_storage->writeObject(object, WriteMode::Rewrite, metadata);
buf->write('0');
buf->finalize();
}
@ -43,9 +45,12 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::findLastRevision()
LOG_TRACE(disk->log, "Check object exists with revision prefix {}", revision_prefix);
const auto & object_storage = disk->object_storage;
StoredObject revision_object{disk->object_storage_root_path + "r" + revision_prefix};
StoredObject revision_operation_object{disk->object_storage_root_path + "operations/r" + revision_prefix};
/// Check file or operation with such revision prefix exists.
if (disk->object_storage->exists(disk->object_storage_root_path + "r" + revision_prefix)
|| disk->object_storage->exists(disk->object_storage_root_path + "operations/r" + revision_prefix))
if (object_storage->exists(revision_object) || object_storage->exists(revision_operation_object))
revision += "1";
else
revision += "0";
@ -69,9 +74,10 @@ int DiskObjectStorageRemoteMetadataRestoreHelper::readSchemaVersion(IObjectStora
void DiskObjectStorageRemoteMetadataRestoreHelper::saveSchemaVersion(const int & version) const
{
auto path = disk->object_storage_root_path + SCHEMA_VERSION_OBJECT;
auto path = fs::path(disk->object_storage_root_path) / SCHEMA_VERSION_OBJECT;
StoredObject object{path};
auto buf = disk->object_storage->writeObject(path, WriteMode::Rewrite);
auto buf = disk->object_storage->writeObject(object, WriteMode::Rewrite);
writeIntText(version, *buf);
buf->finalize();
@ -86,13 +92,13 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateFileToRestorableSchema
{
LOG_TRACE(disk->log, "Migrate file {} to restorable schema", disk->metadata_storage->getPath() + path);
auto objects = disk->metadata_storage->getObjectStoragePaths(path);
for (const auto & [object_path, _] : objects)
auto objects = disk->metadata_storage->getStorageObjects(path);
for (const auto & object : objects)
{
ObjectAttributes metadata {
{"path", path}
};
updateObjectMetadata(object_path, metadata);
updateObjectMetadata(object.path, metadata);
}
}
void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecursive(const String & path, Futures & results)
@ -350,24 +356,24 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
LOG_INFO(disk->log, "Starting restore files for disk {}", disk->name);
std::vector<std::future<void>> results;
auto restore_files = [this, &source_object_storage, &restore_information, &results](const PathsWithSize & keys)
auto restore_files = [this, &source_object_storage, &restore_information, &results](const RelativePathsWithSize & objects)
{
std::vector<String> keys_names;
for (const auto & [key, size] : keys)
for (const auto & object : objects)
{
LOG_INFO(disk->log, "Calling restore for key for disk {}", key);
LOG_INFO(disk->log, "Calling restore for key for disk {}", object.relative_path);
/// Skip file operations objects. They will be processed separately.
if (key.find("/operations/") != String::npos)
if (object.relative_path.find("/operations/") != String::npos)
continue;
const auto [revision, _] = extractRevisionAndOperationFromKey(key);
const auto [revision, _] = extractRevisionAndOperationFromKey(object.relative_path);
/// Filter early if it's possible to get revision from key.
if (revision > restore_information.revision)
continue;
keys_names.push_back(key);
keys_names.push_back(object.relative_path);
}
if (!keys_names.empty())
@ -424,10 +430,12 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::processRestoreFiles(
disk->createDirectories(directoryPath(path));
auto relative_key = shrinkKey(source_path, key);
auto full_path = fs::path(disk->object_storage_root_path) / relative_key;
StoredObject object{full_path};
/// Copy object if we restore to different bucket / path.
if (source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->object_storage_root_path != source_path)
source_object_storage->copyObjectToAnotherObjectStorage(key, disk->object_storage_root_path + relative_key, *disk->object_storage);
source_object_storage->copyObjectToAnotherObjectStorage(key, object, *disk->object_storage);
auto tx = disk->metadata_storage->createTransaction();
tx->addBlobToMetadata(path, relative_key, meta.size_bytes);
@ -462,17 +470,17 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObject
|| disk->object_storage_root_path != restore_information.source_path;
std::set<String> renames;
auto restore_file_operations = [this, &source_object_storage, &restore_information, &renames, &send_metadata](const PathsWithSize & keys)
auto restore_file_operations = [this, &source_object_storage, &restore_information, &renames, &send_metadata](const RelativePathsWithSize & objects)
{
const String rename = "rename";
const String hardlink = "hardlink";
for (const auto & [key, _]: keys)
for (const auto & object : objects)
{
const auto [revision, operation] = extractRevisionAndOperationFromKey(key);
const auto [revision, operation] = extractRevisionAndOperationFromKey(object.relative_path);
if (revision == UNKNOWN_REVISION)
{
LOG_WARNING(disk->log, "Skip key {} with unknown revision", key);
LOG_WARNING(disk->log, "Skip key {} with unknown revision", object.relative_path);
continue;
}
@ -485,7 +493,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFileOperations(IObject
if (send_metadata)
revision_counter = revision - 1;
auto object_attributes = *(source_object_storage->getObjectMetadata(key).attributes);
auto object_attributes = *(source_object_storage->getObjectMetadata(object.relative_path).attributes);
if (operation == rename)
{
auto from_path = object_attributes["from_path"];

View File

@ -21,12 +21,10 @@ namespace ErrorCodes
DiskObjectStorageTransaction::DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
const std::string & remote_fs_root_path_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_)
: object_storage(object_storage_)
, metadata_storage(metadata_storage_)
, metadata_transaction(metadata_storage.createTransaction())
, remote_fs_root_path(remote_fs_root_path_)
, metadata_helper(metadata_helper_)
{}
@ -64,8 +62,7 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
{
std::string path;
bool delete_metadata_only;
bool remove_from_cache{false};
PathsWithSize paths_to_remove;
StoredObjects objects_to_remove;
bool if_exists;
RemoveObjectStorageOperation(
@ -96,20 +93,12 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
try
{
uint32_t hardlink_count = metadata_storage.getHardlinkCount(path);
auto objects = metadata_storage.getObjectStoragePaths(path);
// String full_path = fs::path(metadata_storage.getPath()) / path;
// bool is_remote = object_storage.isRemote();
// if (!is_remote)
// object_storage.removeCacheIfExists(full_path);
auto objects = metadata_storage.getStorageObjects(path);
tx->unlinkMetadata(path);
if (hardlink_count == 0)
{
paths_to_remove = objects;
remove_from_cache = true;
}
objects_to_remove = objects;
}
catch (const Exception & e)
{
@ -133,25 +122,17 @@ struct RemoveObjectStorageOperation final : public IDiskObjectStorageOperation
void finalize() override
{
if (!delete_metadata_only && !paths_to_remove.empty() && object_storage.isRemote())
object_storage.removeObjects(paths_to_remove);
if (remove_from_cache)
{
for (const auto & path_to_remove : paths_to_remove)
object_storage.removeCacheIfExists(path_to_remove.path);
}
if (!delete_metadata_only && !objects_to_remove.empty())
object_storage.removeObjects(objects_to_remove);
}
};
struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOperation
{
std::string path;
std::unordered_map<std::string, PathsWithSize> paths_to_remove;
std::unordered_map<std::string, StoredObjects> objects_to_remove;
bool keep_all_batch_data;
NameSet file_names_remove_metadata_only;
PathsWithSize path_to_remove_from_cache;
RemoveRecursiveObjectStorageOperation(
IObjectStorage & object_storage_,
@ -174,15 +155,12 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
try
{
uint32_t hardlink_count = metadata_storage.getHardlinkCount(path_to_remove);
auto objects_paths = metadata_storage.getObjectStoragePaths(path_to_remove);
auto objects_paths = metadata_storage.getStorageObjects(path_to_remove);
tx->unlinkMetadata(path_to_remove);
if (hardlink_count == 0)
{
paths_to_remove[path_to_remove] = objects_paths;
path_to_remove_from_cache.insert(path_to_remove_from_cache.end(), objects_paths.begin(), objects_paths.end());
}
objects_to_remove[path_to_remove] = objects_paths;
}
catch (const Exception & e)
@ -220,10 +198,10 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
void finalize() override
{
if (!keep_all_batch_data && object_storage.isRemote())
if (!keep_all_batch_data)
{
PathsWithSize remove_from_remote;
for (auto && [local_path, remote_paths] : paths_to_remove)
StoredObjects remove_from_remote;
for (auto && [local_path, remote_paths] : objects_to_remove)
{
if (!file_names_remove_metadata_only.contains(fs::path(local_path).filename()))
{
@ -232,9 +210,6 @@ struct RemoveRecursiveObjectStorageOperation final : public IDiskObjectStorageOp
}
object_storage.removeObjects(remove_from_remote);
}
for (const auto & [path_to_remove, _] : path_to_remove_from_cache)
object_storage.removeCacheIfExists(path_to_remove);
}
};
@ -243,7 +218,7 @@ struct ReplaceFileObjectStorageOperation final : public IDiskObjectStorageOperat
{
std::string path_from;
std::string path_to;
PathsWithSize blobs_to_remove;
StoredObjects objects_to_remove;
ReplaceFileObjectStorageOperation(
IObjectStorage & object_storage_,
@ -259,7 +234,7 @@ struct ReplaceFileObjectStorageOperation final : public IDiskObjectStorageOperat
{
if (metadata_storage.exists(path_to))
{
blobs_to_remove = metadata_storage.getObjectStoragePaths(path_to);
objects_to_remove = metadata_storage.getStorageObjects(path_to);
tx->replaceFile(path_from, path_to);
}
else
@ -273,21 +248,21 @@ struct ReplaceFileObjectStorageOperation final : public IDiskObjectStorageOperat
void finalize() override
{
if (!blobs_to_remove.empty())
object_storage.removeObjects(blobs_to_remove);
if (!objects_to_remove.empty())
object_storage.removeObjects(objects_to_remove);
}
};
struct WriteFileObjectStorageOperation final : public IDiskObjectStorageOperation
{
std::string blob_path;
StoredObject object;
WriteFileObjectStorageOperation(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
const std::string & blob_path_)
const StoredObject & object_)
: IDiskObjectStorageOperation(object_storage_, metadata_storage_)
, blob_path(blob_path_)
, object(object_)
{}
void execute(MetadataTransactionPtr) override
@ -297,8 +272,8 @@ struct WriteFileObjectStorageOperation final : public IDiskObjectStorageOperatio
void undo() override
{
if (object_storage.exists(blob_path))
object_storage.removeObject(blob_path);
if (object_storage.exists(object))
object_storage.removeObject(object);
}
void finalize() override
@ -309,47 +284,44 @@ struct WriteFileObjectStorageOperation final : public IDiskObjectStorageOperatio
struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
{
/// Local paths
std::string from_path;
std::string to_path;
std::string remote_fs_root_path;
std::vector<std::string> created_blobs;
StoredObjects created_objects;
CopyFileObjectStorageOperation(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
const std::string & from_path_,
const std::string & to_path_,
const std::string & remote_fs_root_path_)
const std::string & to_path_)
: IDiskObjectStorageOperation(object_storage_, metadata_storage_)
, from_path(from_path_)
, to_path(to_path_)
, remote_fs_root_path(remote_fs_root_path_)
{}
void execute(MetadataTransactionPtr tx) override
{
tx->createEmptyMetadataFile(to_path);
auto source_blobs = metadata_storage.getObjectStoragePaths(from_path); /// Full paths
auto source_blobs = metadata_storage.getStorageObjects(from_path); /// Full paths
for (const auto & [blob_from, size] : source_blobs)
for (const auto & [blob_from, size, _] : source_blobs)
{
auto blob_name = object_storage.generateBlobNameForPath(to_path);
auto object_to = metadata_storage.createStorageObject(blob_name);
auto blob_to = fs::path(remote_fs_root_path) / blob_name;
object_storage.copyObject(blob_from, blob_to);
object_storage.copyObject(blob_from, object_to.path);
tx->addBlobToMetadata(to_path, blob_name, size);
created_blobs.push_back(blob_to);
created_objects.push_back(object_to);
}
}
void undo() override
{
for (const auto & blob_path : created_blobs)
object_storage.removeObject(blob_path);
for (const auto & object : created_objects)
object_storage.removeObject(object);
}
void finalize() override
@ -487,6 +459,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
bool autocommit)
{
auto blob_name = object_storage.generateBlobNameForPath(path);
auto object = metadata_storage.createStorageObject(blob_name);
std::optional<ObjectAttributes> object_attributes;
if (metadata_helper)
@ -499,9 +472,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
blob_name = "r" + revisionToString(revision) + "-file-" + blob_name;
}
auto blob_path = fs::path(remote_fs_root_path) / blob_name;
auto create_metadata_callback = [tx = shared_from_this(), mode, path, blob_name, autocommit] (size_t count)
auto create_metadata_callback = [tx = shared_from_this(), mode, path, blob_name=blob_name, autocommit] (size_t count)
{
if (mode == WriteMode::Rewrite)
tx->metadata_transaction->createMetadataFile(path, blob_name, count);
@ -512,13 +483,16 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
tx->metadata_transaction->commit();
};
operations_to_execute.emplace_back(std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, blob_path));
operations_to_execute.emplace_back(std::make_unique<WriteFileObjectStorageOperation>(object_storage, metadata_storage, object));
/// We always use mode Rewrite because we simulate append using metadata and different files
return object_storage.writeObject(
blob_path, WriteMode::Rewrite, object_attributes,
object,
WriteMode::Rewrite,
object_attributes,
std::move(create_metadata_callback),
buf_size, settings);
buf_size,
settings);
}
@ -560,7 +534,8 @@ void DiskObjectStorageTransaction::createFile(const std::string & path)
void DiskObjectStorageTransaction::copyFile(const std::string & from_file_path, const std::string & to_file_path)
{
operations_to_execute.emplace_back(std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, from_file_path, to_file_path, remote_fs_root_path));
operations_to_execute.emplace_back(
std::make_unique<CopyFileObjectStorageOperation>(object_storage, metadata_storage, from_file_path, to_file_path));
}
void DiskObjectStorageTransaction::commit()

View File

@ -54,7 +54,6 @@ private:
IMetadataStorage & metadata_storage;
MetadataTransactionPtr metadata_transaction;
/// TODO we can get rid of this params
const std::string & remote_fs_root_path;
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper;
DiskObjectStorageOperations operations_to_execute;
@ -62,7 +61,6 @@ public:
DiskObjectStorageTransaction(
IObjectStorage & object_storage_,
IMetadataStorage & metadata_storage_,
const std::string & remote_fs_root_path_,
DiskObjectStorageRemoteMetadataRestoreHelper * metadata_helper_);
void commit() override;

View File

@ -38,35 +38,36 @@ std::string HDFSObjectStorage::generateBlobNameForPath(const std::string & /* pa
return getRandomASCIIString();
}
bool HDFSObjectStorage::exists(const std::string & hdfs_uri) const
bool HDFSObjectStorage::exists(const StoredObject & object) const
{
const size_t begin_of_path = hdfs_uri.find('/', hdfs_uri.find("//") + 2);
const String remote_fs_object_path = hdfs_uri.substr(begin_of_path);
const auto & path = object.path;
const size_t begin_of_path = path.find('/', path.find("//") + 2);
const String remote_fs_object_path = path.substr(begin_of_path);
return (0 == hdfsExists(hdfs_fs.get(), remote_fs_object_path.c_str()));
}
std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObject( /// NOLINT
const std::string & path,
const StoredObject & object,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
return std::make_unique<ReadBufferFromHDFS>(path, path, config, read_settings);
return std::make_unique<ReadBufferFromHDFS>(object.path, object.path, config, read_settings);
}
std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLINT
const PathsWithSize & paths_to_read,
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
auto hdfs_impl = std::make_unique<ReadBufferFromHDFSGather>(config, paths_to_read, read_settings);
auto hdfs_impl = std::make_unique<ReadBufferFromHDFSGather>(config, objects, read_settings);
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl));
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
}
std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOLINT
const std::string & path,
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes,
FinalizeCallback && finalize_callback,
@ -80,10 +81,10 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
/// Single O_WRONLY in libhdfs adds O_TRUNC
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(
path, config, settings->replication, buf_size,
object.path, config, settings->replication, buf_size,
mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(hdfs_buffer), std::move(finalize_callback), path);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(hdfs_buffer), std::move(finalize_callback), object.path);
}
@ -100,8 +101,9 @@ void HDFSObjectStorage::listPrefix(const std::string & path, RelativePathsWithSi
}
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void HDFSObjectStorage::removeObject(const std::string & path)
void HDFSObjectStorage::removeObject(const StoredObject & object)
{
const auto & path = object.path;
const size_t begin_of_path = path.find('/', path.find("//") + 2);
/// Add path from root to file name
@ -111,22 +113,22 @@ void HDFSObjectStorage::removeObject(const std::string & path)
}
void HDFSObjectStorage::removeObjects(const PathsWithSize & paths)
void HDFSObjectStorage::removeObjects(const StoredObjects & objects)
{
for (const auto & [path, _] : paths)
removeObject(path);
for (const auto & object : objects)
removeObject(object.path);
}
void HDFSObjectStorage::removeObjectIfExists(const std::string & path)
void HDFSObjectStorage::removeObjectIfExists(const StoredObject & object)
{
if (exists(path))
removeObject(path);
if (exists(object))
removeObject(object);
}
void HDFSObjectStorage::removeObjectsIfExist(const PathsWithSize & paths)
void HDFSObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
{
for (const auto & [path, _] : paths)
removeObjectIfExists(path);
for (const auto & object : objects)
removeObjectIfExists(object.path);
}
ObjectMetadata HDFSObjectStorage::getObjectMetadata(const std::string &) const
@ -137,8 +139,8 @@ ObjectMetadata HDFSObjectStorage::getObjectMetadata(const std::string &) const
}
void HDFSObjectStorage::copyObject( /// NOLINT
const std::string & object_from,
const std::string & object_to,
const StoredObject & object_from,
const StoredObject & object_to,
std::optional<ObjectAttributes> object_to_attributes)
{
if (object_to_attributes.has_value())

View File

@ -48,26 +48,25 @@ public:
, hdfs_builder(createHDFSBuilder(hdfs_root_path_, config))
, hdfs_fs(createHDFSFS(hdfs_builder.get()))
, settings(std::move(settings_))
, hdfs_root_path(hdfs_root_path_)
{}
bool exists(const std::string & hdfs_uri) const override;
bool exists(const StoredObject & object) const override;
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
const std::string & path,
const StoredObject & object,
const ReadSettings & read_settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const PathsWithSize & paths_to_read,
const StoredObjects & objects,
const ReadSettings & read_settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const std::string & path,
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
@ -77,19 +76,19 @@ public:
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const std::string & path) override;
void removeObject(const StoredObject & object) override;
void removeObjects(const PathsWithSize & paths) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const std::string & path) override;
void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const PathsWithSize & paths) override;
void removeObjectsIfExist(const StoredObjects & objects) override;
ObjectMetadata getObjectMetadata(const std::string & path) const override;
void copyObject( /// NOLINT
const std::string & object_from,
const std::string & object_to,
const StoredObject & object_from,
const StoredObject & object_to,
std::optional<ObjectAttributes> object_to_attributes = {}) override;
void shutdown() override;

View File

@ -87,6 +87,9 @@ class IMetadataStorage : private boost::noncopyable
{
friend class MetadataStorageFromDiskTransaction;
protected:
mutable std::shared_mutex metadata_mutex;
public:
virtual MetadataTransactionPtr createTransaction() const = 0;
@ -118,17 +121,17 @@ public:
/// ==== More specefic methods. Previous were almost general purpose. ====
virtual DiskPtr getDisk() const = 0;
/// Read multiple metadata files into strings and return mapping from file_path -> metadata
virtual std::unordered_map<std::string, std::string> getSerializedMetadata(const std::vector<String> & file_paths) const = 0;
/// Return [(object_storage_path, size_in_bytes), ...] for metadata path
/// object_storage_path is a full path to the blob.
virtual PathsWithSize getObjectStoragePaths(const std::string & path) const = 0;
/// Return [(object_storage_path, size_in_bytes), ...] for metadata path.
/// object_storage_path is absolute.
virtual StoredObjects getStorageObjects(const std::string & path) const = 0;
virtual DiskPtr getDisk() const = 0;
protected:
mutable std::shared_mutex metadata_mutex;
/// Creates StoredObject object by blob_name.
virtual StoredObject createStorageObject(const std::string & blob_name) const = 0;
};
using MetadataStoragePtr = std::shared_ptr<IMetadataStorage>;

View File

@ -27,7 +27,11 @@ ThreadPool & IObjectStorage::getThreadPoolWriter()
return writer;
}
void IObjectStorage::copyObjectToAnotherObjectStorage(const std::string & object_from, const std::string & object_to, IObjectStorage & object_storage_to, std::optional<ObjectAttributes> object_to_attributes) // NOLINT
void IObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
const StoredObject & object_from,
const StoredObject & object_to,
IObjectStorage & object_storage_to,
std::optional<ObjectAttributes> object_to_attributes)
{
if (&object_storage_to == this)
copyObject(object_from, object_to, object_to_attributes);
@ -43,4 +47,20 @@ const std::string & IObjectStorage::getCacheBasePath() const
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getCacheBasePath() is not implemented for object storage");
}
StoredObject::StoredObject(
const std::string & path_,
uint64_t bytes_size_,
std::function<String(const String &)> && cache_hint_creator_)
: path(path_)
, bytes_size(bytes_size_)
, cache_hint_creator(std::move(cache_hint_creator_))
{}
std::string StoredObject::getCacheHint() const
{
if (cache_hint_creator)
return cache_hint_creator(path);
return "";
}
}

View File

@ -24,24 +24,38 @@ class WriteBufferFromFileBase;
using ObjectAttributes = std::map<std::string, std::string>;
/// Path to a file and its size.
/// Path can be either relative or absolute - according to the context of use.
struct PathWithSize
struct RelativePathWithSize
{
std::string path;
String relative_path;
size_t bytes_size;
RelativePathWithSize() = default;
RelativePathWithSize(const String & relative_path_, size_t bytes_size_)
: relative_path(relative_path_), bytes_size(bytes_size_) {}
};
using RelativePathsWithSize = std::vector<RelativePathWithSize>;
/// Object metadata: path, size. cache_hint.
struct StoredObject
{
std::string path; /// absolute
uint64_t bytes_size;
PathWithSize() = default;
/// Optional cache hint for cache. Use delayed initialization
/// because somecache hint implementation requires it.
using CacheHintCreator = std::function<std::string(const std::string &)>;
CacheHintCreator cache_hint_creator;
PathWithSize(const std::string & path_, uint64_t bytes_size_)
: path(path_)
, bytes_size(bytes_size_)
{}
StoredObject() = default;
StoredObject(const std::string & path_, uint64_t bytes_size_ = 0, CacheHintCreator && cache_hint_creator_ = {});
std::string getCacheHint() const;
};
/// List of paths with their sizes
using PathsWithSize = std::vector<PathWithSize>;
using RelativePathsWithSize = PathsWithSize;
using StoredObjects = std::vector<StoredObject>;
struct ObjectMetadata
{
@ -60,8 +74,8 @@ class IObjectStorage
public:
IObjectStorage() = default;
/// Path exists or not
virtual bool exists(const std::string & path) const = 0;
/// Object exists or not
virtual bool exists(const StoredObject & object) const = 0;
/// List on prefix, return children (relative paths) with their sizes.
virtual void listPrefix(const std::string & path, RelativePathsWithSize & children) const = 0;
@ -70,23 +84,23 @@ public:
/// at least size of object
virtual ObjectMetadata getObjectMetadata(const std::string & path) const = 0;
/// Read single path from object storage
/// Read single object
virtual std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
const std::string & path,
const StoredObject & object,
const ReadSettings & read_settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const = 0;
/// Read multiple objects with common prefix
virtual std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const PathsWithSize & paths_to_read,
const StoredObjects & objects,
const ReadSettings & read_settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const = 0;
/// Open the file for write and return WriteBufferFromFileBase object.
virtual std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const std::string & path,
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
@ -96,30 +110,30 @@ public:
virtual bool isRemote() const = 0;
/// Remove object. Throws exception if object doesn't exists.
virtual void removeObject(const std::string & path) = 0;
virtual void removeObject(const StoredObject & object) = 0;
/// Remove multiple objects. Some object storages can do batch remove in a more
/// optimal way.
virtual void removeObjects(const PathsWithSize & paths) = 0;
virtual void removeObjects(const StoredObjects & objects) = 0;
/// Remove object on path if exists
virtual void removeObjectIfExists(const std::string & path) = 0;
virtual void removeObjectIfExists(const StoredObject & object) = 0;
/// Remove objects on path if exists
virtual void removeObjectsIfExist(const PathsWithSize & paths) = 0;
virtual void removeObjectsIfExist(const StoredObjects & object) = 0;
/// Copy object with different attributes if required
virtual void copyObject( /// NOLINT
const std::string & object_from,
const std::string & object_to,
const StoredObject & object_from,
const StoredObject & object_to,
std::optional<ObjectAttributes> object_to_attributes = {}) = 0;
/// Copy object to another instance of object storage
/// by default just read the object from source object storage and write
/// to destination through buffers.
virtual void copyObjectToAnotherObjectStorage( /// NOLINT
const std::string & object_from,
const std::string & object_to,
const StoredObject & object_from,
const StoredObject & object_to,
IObjectStorage & object_storage_to,
std::optional<ObjectAttributes> object_to_attributes = {});
@ -159,7 +173,7 @@ public:
virtual bool supportsAppend() const { return false; }
virtual void removeCacheIfExists(const std::string & /* path */) {}
virtual void removeCacheIfExists(const std::string & /* path_hint_for_cache */) {}
};
using ObjectStoragePtr = std::shared_ptr<IObjectStorage>;

View File

@ -30,29 +30,31 @@ LocalObjectStorage::LocalObjectStorage()
{
}
bool LocalObjectStorage::exists(const std::string & path) const
bool LocalObjectStorage::exists(const StoredObject & object) const
{
return fs::exists(path);
return fs::exists(object.path);
}
std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOLINT
const PathsWithSize & paths_to_read,
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
if (paths_to_read.size() != 1)
if (objects.size() != 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "LocalObjectStorage support read only from single object");
return readObject(paths_to_read[0].path, read_settings, read_hint, file_size);
return readObject(objects[0], read_settings, read_hint, file_size);
}
std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObject( /// NOLINT
const std::string & path,
const StoredObject & object,
const ReadSettings & read_settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
const auto & path = object.path;
if (!file_size.has_value())
file_size = getFileSizeIfPossible(path);
@ -64,48 +66,53 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObject( /// NOLI
}
std::unique_ptr<WriteBufferFromFileBase> LocalObjectStorage::writeObject( /// NOLINT
const std::string & path,
const StoredObject & object,
WriteMode mode, // Local doesn't support append, only rewrite
std::optional<ObjectAttributes> /* attributes */,
FinalizeCallback && /* finalize_callback */,
size_t buf_size,
const WriteSettings & /* write_settings */)
{
const auto & path = object.path;
int flags = (mode == WriteMode::Append) ? (O_APPEND | O_CREAT | O_WRONLY) : -1;
LOG_TEST(log, "Write object: {}", path);
return std::make_unique<WriteBufferFromFile>(path, buf_size, flags);
}
void LocalObjectStorage::listPrefix(const std::string & path, PathsWithSize & children) const
void LocalObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
{
fs::directory_iterator end_it;
for (auto it = fs::directory_iterator(path); it != end_it; ++it)
children.emplace_back(it->path().filename(), it->file_size());
}
void LocalObjectStorage::removeObject(const std::string & path)
void LocalObjectStorage::removeObject(const StoredObject & object)
{
auto fs_path = fs::path(path) / path;
/// For local object storage files are actually removed when "metadata" is removed.
if (!exists(object))
return;
auto fs_path = fs::path(object.path) / object.path;
if (0 != unlink(fs_path.c_str()))
throwFromErrnoWithPath("Cannot unlink file " + fs_path.string(), fs_path, ErrorCodes::CANNOT_UNLINK);
}
void LocalObjectStorage::removeObjects(const PathsWithSize & paths)
void LocalObjectStorage::removeObjects(const StoredObjects & objects)
{
for (const auto & [path, _] : paths)
removeObject(path);
for (const auto & object : objects)
removeObject(object);
}
void LocalObjectStorage::removeObjectIfExists(const std::string & path)
void LocalObjectStorage::removeObjectIfExists(const StoredObject & object)
{
if (exists(path))
removeObject(path);
if (exists(object))
removeObject(object);
}
void LocalObjectStorage::removeObjectsIfExist(const PathsWithSize & paths)
void LocalObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
{
for (const auto & [path, _] : paths)
removeObjectIfExists(path);
for (const auto & object : objects)
removeObjectIfExists(object);
}
ObjectMetadata LocalObjectStorage::getObjectMetadata(const std::string & /* path */) const
@ -113,22 +120,13 @@ ObjectMetadata LocalObjectStorage::getObjectMetadata(const std::string & /* path
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Metadata is not supported for LocalObjectStorage");
}
void LocalObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
const std::string & /* object_from */,
const std::string & /* object_to */,
IObjectStorage & /* object_storage_to */,
std::optional<ObjectAttributes> /* object_to_attributes */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "cloneObjectStorage() is not implemented for LocalObjectStorage");
}
void LocalObjectStorage::copyObject( // NOLINT
const std::string & object_from, const std::string & object_to, std::optional<ObjectAttributes> /* object_to_attributes */)
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> /* object_to_attributes */)
{
fs::path to = object_to;
fs::path from = object_from;
fs::path to = object_to.path;
fs::path from = object_from.path;
if (object_from.ends_with('/'))
if (object_from.path.ends_with('/'))
from = from.parent_path();
if (fs::is_directory(from))
to /= from.filename();

View File

@ -17,50 +17,44 @@ class LocalObjectStorage : public IObjectStorage
public:
LocalObjectStorage();
bool exists(const std::string & path) const override;
bool exists(const StoredObject & object) const override;
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
const std::string & path,
const StoredObject & object,
const ReadSettings & read_settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const PathsWithSize & paths_to_read,
const StoredObjects & objects,
const ReadSettings & read_settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const std::string & path,
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void listPrefix(const std::string & path, PathsWithSize & children) const override;
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
void removeObject(const std::string & path) override;
void removeObject(const StoredObject & object) override;
void removeObjects(const PathsWithSize & paths) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const std::string & path) override;
void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const PathsWithSize & paths) override;
void removeObjectsIfExist(const StoredObjects & objects) override;
ObjectMetadata getObjectMetadata(const std::string & path) const override;
void copyObject( /// NOLINT
const std::string & object_from,
const std::string & object_to,
std::optional<ObjectAttributes> object_to_attributes = {}) override;
void copyObjectToAnotherObjectStorage( /// NOLINT
const std::string & object_from,
const std::string & object_to,
IObjectStorage & object_storage_to,
const StoredObject & object_from,
const StoredObject & object_to,
std::optional<ObjectAttributes> object_to_attributes = {}) override;
void shutdown() override;

View File

@ -1,6 +1,7 @@
#include "MetadataStorageFromLocalDisk.h"
#include <Disks/IDisk.h>
#include <Common/filesystemHelpers.h>
#include <Common/logger_useful.h>
#include <IO/WriteHelpers.h>
@ -12,9 +13,13 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
MetadataStorageFromLocalDisk::MetadataStorageFromLocalDisk(DiskPtr disk_, ObjectStoragePtr object_storage_)
MetadataStorageFromLocalDisk::MetadataStorageFromLocalDisk(
DiskPtr disk_,
ObjectStoragePtr object_storage_,
const std::string & object_storage_root_path_)
: disk(disk_)
, object_storage(object_storage_)
, object_storage_root_path(object_storage_root_path_)
{
}
@ -85,10 +90,48 @@ std::unordered_map<String, String> MetadataStorageFromLocalDisk::getSerializedMe
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getSerializedMetadata is not implemented for MetadataStorageFromLocalDisk");
}
PathsWithSize MetadataStorageFromLocalDisk::getObjectStoragePaths(const std::string & path) const
StoredObjects MetadataStorageFromLocalDisk::getStorageObjects(const std::string & path) const
{
auto full_path = fs::path(getPath()) / path;
return {PathWithSize{toString(getINodeNumberFromPath(full_path)), getFileSize(full_path)}};
auto blob_name = object_storage->generateBlobNameForPath(path);
auto object = createStorageObject(path);
return {std::move(object)};
}
StoredObject MetadataStorageFromLocalDisk::createStorageObject(const std::string & blob_name) const
{
auto blob_path = fs::path(object_storage_root_path) / blob_name;
StoredObject::CacheHintCreator cache_hint_creator;
size_t object_size = 0;
if (exists(blob_path))
{
object_size = getFileSize(blob_path);
cache_hint_creator = [cache_hint = toString(getINodeNumberFromPath(blob_path))](const String &)
{
return cache_hint;
};
}
else
{
cache_hint_creator = [](const String & blob_path_) -> String
{
try
{
return toString(getINodeNumberFromPath(blob_path_));
}
catch (...)
{
LOG_DEBUG(
&Poco::Logger::get("MetadataStorageFromLocalDisk"),
"Object does not exist while getting cache path hint (object path: {})",
blob_path_);
return "";
}
};
}
return {blob_path, object_size, std::move(cache_hint_creator)};
}
uint32_t MetadataStorageFromLocalDisk::getHardlinkCount(const std::string & path) const

View File

@ -12,7 +12,10 @@ class MetadataStorageFromLocalDisk : public IMetadataStorage
{
public:
explicit MetadataStorageFromLocalDisk(DiskPtr disk_, ObjectStoragePtr object_storage_);
explicit MetadataStorageFromLocalDisk(
DiskPtr disk_,
ObjectStoragePtr object_storage_,
const std::string & object_storage_root_path_);
MetadataTransactionPtr createTransaction() const override;
@ -38,15 +41,18 @@ public:
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override;
PathsWithSize getObjectStoragePaths(const std::string & path) const override;
uint32_t getHardlinkCount(const std::string & path) const override;
DiskPtr getDisk() const override { return disk; }
StoredObjects getStorageObjects(const std::string & path) const override;
StoredObject createStorageObject(const std::string & blob_name) const override;
private:
DiskPtr disk;
ObjectStoragePtr object_storage;
std::string object_storage_root_path;
};
class MetadataStorageFromLocalDiskTransaction final : public MetadataStorageFromDiskTransaction

View File

@ -71,7 +71,7 @@ std::string MetadataStorageFromRemoteDisk::readFileToString(const std::string &
DiskObjectStorageMetadataPtr MetadataStorageFromRemoteDisk::readMetadataUnlocked(const std::string & path, std::shared_lock<std::shared_mutex> &) const
{
auto metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), root_path_for_remote_metadata, path);
auto metadata = std::make_unique<DiskObjectStorageMetadata>(disk->getPath(), object_storage_root_path, path);
auto str = readFileToString(path);
metadata->deserializeFromString(str);
return metadata;
@ -116,20 +116,32 @@ MetadataTransactionPtr MetadataStorageFromRemoteDisk::createTransaction() const
return std::make_shared<MetadataStorageFromRemoteDiskTransaction>(*this);
}
PathsWithSize MetadataStorageFromRemoteDisk::getObjectStoragePaths(const std::string & path) const
StoredObjects MetadataStorageFromRemoteDisk::getStorageObjects(const std::string & path) const
{
auto metadata = readMetadata(path);
PathsWithSize object_storage_paths = metadata->getBlobs(); /// Relative paths.
auto root_path = metadata->getBlobsCommonPrefix();
auto object_storage_relative_paths = metadata->getBlobsRelativePaths(); /// Relative paths.
StoredObjects object_storage_paths;
object_storage_paths.reserve(object_storage_relative_paths.size());
/// Relative paths -> absolute.
for (auto & [object_path, _] : object_storage_paths)
object_path = fs::path(root_path) / object_path;
for (auto & [object_relative_path, size] : object_storage_relative_paths)
{
auto object_path = fs::path(metadata->getBlobsCommonPrefix()) / object_relative_path;
StoredObject object{ object_path, size, [](const String & path_){ return path_; }};
object_storage_paths.push_back(object);
}
return object_storage_paths;
}
StoredObject MetadataStorageFromRemoteDisk::createStorageObject(const std::string & blob_name) const
{
auto object_path = fs::path(object_storage_root_path) / blob_name;
return { object_path, 0, [](const String & path){ return path; }};
}
uint32_t MetadataStorageFromRemoteDisk::getHardlinkCount(const std::string & path) const
{
auto metadata = readMetadata(path);

View File

@ -16,12 +16,12 @@ private:
friend class MetadataStorageFromRemoteDiskTransaction;
DiskPtr disk;
std::string root_path_for_remote_metadata;
std::string object_storage_root_path;
public:
MetadataStorageFromRemoteDisk(DiskPtr disk_, const std::string & root_path_for_remote_metadata_)
MetadataStorageFromRemoteDisk(DiskPtr disk_, const std::string & object_storage_root_path_)
: disk(disk_)
, root_path_for_remote_metadata(root_path_for_remote_metadata_)
, object_storage_root_path(object_storage_root_path_)
{
}
@ -49,14 +49,16 @@ public:
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override;
PathsWithSize getObjectStoragePaths(const std::string & path) const override;
uint32_t getHardlinkCount(const std::string & path) const override;
std::string getMetadataPath() const { return root_path_for_remote_metadata; }
std::string getMetadataPath() const { return object_storage_root_path; }
DiskPtr getDisk() const override { return disk; }
StoredObjects getStorageObjects(const std::string & path) const override;
StoredObject createStorageObject(const std::string & blob_name) const override;
private:
DiskObjectStorageMetadataPtr readMetadata(const std::string & path) const;

View File

@ -27,6 +27,7 @@
#include <Common/IFileCache.h>
#include <Common/FileCacheFactory.h>
#include <Common/getRandomASCIIString.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -100,9 +101,9 @@ Aws::S3::Model::HeadObjectOutcome S3ObjectStorage::requestObjectHeadData(const s
return client_ptr->HeadObject(request);
}
bool S3ObjectStorage::exists(const std::string & path) const
bool S3ObjectStorage::exists(const StoredObject & object) const
{
auto object_head = requestObjectHeadData(bucket, path);
auto object_head = requestObjectHeadData(bucket, object.path);
if (!object_head.IsSuccess())
{
if (object_head.GetError().GetErrorType() == Aws::S3::S3Errors::RESOURCE_NOT_FOUND)
@ -115,7 +116,7 @@ bool S3ObjectStorage::exists(const std::string & path) const
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
const PathsWithSize & paths_to_read,
const StoredObjects & objects,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
@ -126,7 +127,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
client.get(),
bucket,
version_id,
paths_to_read,
objects,
settings_ptr->s3_settings.max_single_read_retries,
read_settings);
@ -143,21 +144,19 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
}
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
const std::string & path,
const StoredObject & object,
const ReadSettings & read_settings,
std::optional<size_t>,
std::optional<size_t>) const
{
auto settings_ptr = s3_settings.get();
///TODO: KSSENII
ReadSettings disk_read_settings{read_settings};
return std::make_unique<ReadBufferFromS3>(
client.get(), bucket, path, version_id, settings_ptr->s3_settings.max_single_read_retries, disk_read_settings);
client.get(), bucket, object.path, version_id, settings_ptr->s3_settings.max_single_read_retries, read_settings);
}
std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLINT
const std::string & path,
const StoredObject & object,
WriteMode mode, // S3 doesn't support append, only rewrite
std::optional<ObjectAttributes> attributes,
FinalizeCallback && finalize_callback,
@ -171,12 +170,12 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
client.get(),
bucket,
path,
object.path,
settings_ptr->s3_settings,
attributes,
buf_size, threadPoolCallbackRunner(getThreadPoolWriter()));
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(s3_buffer), std::move(finalize_callback), path);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(s3_buffer), std::move(finalize_callback), object.path);
}
void S3ObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize & children) const
@ -208,7 +207,7 @@ void S3ObjectStorage::listPrefix(const std::string & path, RelativePathsWithSize
} while (outcome.GetResult().GetIsTruncated());
}
void S3ObjectStorage::removeObjectImpl(const std::string & path, bool if_exists)
void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exists)
{
auto client_ptr = client.get();
@ -218,7 +217,7 @@ void S3ObjectStorage::removeObjectImpl(const std::string & path, bool if_exists)
{
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(path);
request.SetKey(object.path);
auto outcome = client_ptr->DeleteObject(request);
throwIfUnexpectedError(outcome, if_exists);
@ -228,7 +227,7 @@ void S3ObjectStorage::removeObjectImpl(const std::string & path, bool if_exists)
/// TODO: For AWS we prefer to use multiobject operation even for single object
/// maybe we shouldn't?
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(path);
obj.SetKey(object.path);
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects({obj});
Aws::S3::Model::DeleteObjectsRequest request;
@ -240,15 +239,15 @@ void S3ObjectStorage::removeObjectImpl(const std::string & path, bool if_exists)
}
}
void S3ObjectStorage::removeObjectsImpl(const PathsWithSize & paths, bool if_exists)
void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_exists)
{
if (paths.empty())
if (objects.empty())
return;
if (!s3_capabilities.support_batch_delete)
{
for (const auto & [path, _] : paths)
removeObjectImpl(path, if_exists);
for (const auto & object : objects)
removeObjectImpl(object, if_exists);
}
else
{
@ -258,19 +257,19 @@ void S3ObjectStorage::removeObjectsImpl(const PathsWithSize & paths, bool if_exi
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
size_t current_position = 0;
while (current_position < paths.size())
while (current_position < objects.size())
{
std::vector<Aws::S3::Model::ObjectIdentifier> current_chunk;
String keys;
for (; current_position < paths.size() && current_chunk.size() < chunk_size_limit; ++current_position)
for (; current_position < objects.size() && current_chunk.size() < chunk_size_limit; ++current_position)
{
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(paths[current_position].path);
obj.SetKey(objects[current_position].path);
current_chunk.push_back(obj);
if (!keys.empty())
keys += ", ";
keys += paths[current_position].path;
keys += objects[current_position].path;
}
Aws::S3::Model::Delete delkeys;
@ -285,24 +284,24 @@ void S3ObjectStorage::removeObjectsImpl(const PathsWithSize & paths, bool if_exi
}
}
void S3ObjectStorage::removeObject(const std::string & path)
void S3ObjectStorage::removeObject(const StoredObject & object)
{
removeObjectImpl(path, false);
removeObjectImpl(object, false);
}
void S3ObjectStorage::removeObjectIfExists(const std::string & path)
void S3ObjectStorage::removeObjectIfExists(const StoredObject & object)
{
removeObjectImpl(path, true);
removeObjectImpl(object, true);
}
void S3ObjectStorage::removeObjects(const PathsWithSize & paths)
void S3ObjectStorage::removeObjects(const StoredObjects & objects)
{
removeObjectsImpl(paths, false);
removeObjectsImpl(objects, false);
}
void S3ObjectStorage::removeObjectsIfExist(const PathsWithSize & paths)
void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
{
removeObjectsImpl(paths, true);
removeObjectsImpl(objects, true);
}
ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const
@ -320,16 +319,24 @@ ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) cons
return result;
}
void S3ObjectStorage::copyObjectToAnotherObjectStorage(const std::string & object_from, const std::string & object_to, IObjectStorage & object_storage_to, std::optional<ObjectAttributes> object_to_attributes) // NOLINT
void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
const StoredObject & object_from,
const StoredObject & object_to,
IObjectStorage & object_storage_to,
std::optional<ObjectAttributes> object_to_attributes)
{
/// Shortcut for S3
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
copyObjectImpl(bucket, object_from, dest_s3->bucket, object_to, {}, object_to_attributes);
copyObjectImpl(bucket, object_from.path, dest_s3->bucket, object_to.path, {}, object_to_attributes);
else
IObjectStorage::copyObjectToAnotherObjectStorage(object_from, object_to, object_storage_to, object_to_attributes);
}
void S3ObjectStorage::copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
void S3ObjectStorage::copyObjectImpl(
const String & src_bucket,
const String & src_key,
const String & dst_bucket,
const String & dst_key,
std::optional<Aws::S3::Model::HeadObjectResult> head,
std::optional<ObjectAttributes> metadata) const
{
@ -432,13 +439,14 @@ void S3ObjectStorage::copyObjectMultipartImpl(const String & src_bucket, const S
}
}
void S3ObjectStorage::copyObject(const std::string & object_from, const std::string & object_to, std::optional<ObjectAttributes> object_to_attributes) // NOLINT
void S3ObjectStorage::copyObject( // NOLINT
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> object_to_attributes)
{
auto head = requestObjectHeadData(bucket, object_from).GetResult();
auto head = requestObjectHeadData(bucket, object_from.path).GetResult();
if (head.GetContentLength() >= static_cast<int64_t>(5UL * 1024 * 1024 * 1024))
copyObjectMultipartImpl(bucket, object_from, bucket, object_to, head, object_to_attributes);
copyObjectMultipartImpl(bucket, object_from.path, bucket, object_to.path, head, object_to_attributes);
else
copyObjectImpl(bucket, object_from, bucket, object_to, head, object_to_attributes);
copyObjectImpl(bucket, object_from.path, bucket, object_to.path, head, object_to_attributes);
}
void S3ObjectStorage::setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_)

View File

@ -55,23 +55,23 @@ public:
, version_id(std::move(version_id_))
{}
bool exists(const std::string & path) const override;
bool exists(const StoredObject & object) const override;
std::unique_ptr<ReadBufferFromFileBase> readObject( /// NOLINT
const std::string & path,
const StoredObject & object,
const ReadSettings & read_settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<ReadBufferFromFileBase> readObjects( /// NOLINT
const PathsWithSize & paths_to_read,
const StoredObjects & objects,
const ReadSettings & read_settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override;
/// Open the file for write and return WriteBufferFromFileBase object.
std::unique_ptr<WriteBufferFromFileBase> writeObject( /// NOLINT
const std::string & path,
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
@ -81,24 +81,24 @@ public:
void listPrefix(const std::string & path, RelativePathsWithSize & children) const override;
/// Remove file. Throws exception if file doesn't exist or it's a directory.
void removeObject(const std::string & path) override;
void removeObject(const StoredObject & object) override;
void removeObjects(const PathsWithSize & paths) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const std::string & path) override;
void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const PathsWithSize & paths) override;
void removeObjectsIfExist(const StoredObjects & objects) override;
ObjectMetadata getObjectMetadata(const std::string & path) const override;
void copyObject( /// NOLINT
const std::string & object_from,
const std::string & object_to,
const StoredObject & object_from,
const StoredObject & object_to,
std::optional<ObjectAttributes> object_to_attributes = {}) override;
void copyObjectToAnotherObjectStorage( /// NOLINT
const std::string & object_from,
const std::string & object_to,
const StoredObject & object_from,
const StoredObject & object_to,
IObjectStorage & object_storage_to,
std::optional<ObjectAttributes> object_to_attributes = {}) override;
@ -113,12 +113,6 @@ public:
std::string getObjectsNamespace() const override { return bucket; }
std::unique_ptr<IObjectStorage> cloneObjectStorage(
const std::string & new_namespace,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ContextPtr context) override;
std::string generateBlobNameForPath(const std::string & path) override;
bool isRemote() const override { return true; }
@ -150,8 +144,8 @@ private:
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
std::optional<ObjectAttributes> metadata = std::nullopt) const;
void removeObjectImpl(const std::string & path, bool if_exists);
void removeObjectsImpl(const PathsWithSize & paths, bool if_exists);
void removeObjectImpl(const StoredObject & object, bool if_exists);
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
Aws::S3::Model::HeadObjectOutcome requestObjectHeadData(const std::string & bucket_from, const std::string & key) const;