Trying to create metadata layer

This commit is contained in:
alesapin 2022-06-02 18:09:40 +02:00
parent a9af4a80b0
commit 46bccae078
29 changed files with 807 additions and 470 deletions

View File

@ -629,6 +629,7 @@
M(658, MEILISEARCH_MISSING_SOME_COLUMNS) \
M(659, UNKNOWN_STATUS_OF_TRANSACTION) \
M(660, HDFS_ERROR) \
M(661, FS_METADATA_ERROR) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \

View File

@ -0,0 +1,31 @@
#pragma once
#include <string>
#include <memory>
namespace DB
{
/**
* Iterator of directory contents
*/
class IDirectoryIterator
{
public:
/// Iterate to the next file.
virtual void next() = 0;
/// Return `true` if the iterator points to a valid element.
virtual bool isValid() const = 0;
/// Path to the file that the iterator currently points to.
virtual std::string path() const = 0;
/// Name of the file that the iterator currently points to.
virtual std::string name() const = 0;
virtual ~IDirectoryIterator() = default;
};
using DirectoryIteratorPtr = std::unique_ptr<IDirectoryIterator>;
}

View File

@ -83,7 +83,7 @@ void DiskDecorator::moveDirectory(const String & from_path, const String & to_pa
delegate->moveDirectory(from_path, to_path);
}
DiskDirectoryIteratorPtr DiskDecorator::iterateDirectory(const String & path)
DirectoryIteratorPtr DiskDecorator::iterateDirectory(const String & path)
{
return delegate->iterateDirectory(path);
}

View File

@ -28,7 +28,7 @@ public:
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
DirectoryIteratorPtr iterateDirectory(const String & path) override;
void createFile(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;
@ -77,7 +77,7 @@ public:
std::vector<String> getRemotePaths(const String & path) const override { return delegate->getRemotePaths(path); }
void getRemotePathsRecursive(const String & path, std::vector<LocalPathWithRemotePaths> & paths_map) override { return delegate->getRemotePathsRecursive(path, paths_map); }
DiskPtr getMetadataDiskIfExistsOrSelf() override { return delegate->getMetadataDiskIfExistsOrSelf(); }
MetadataStoragePtr getMetadataStorage() override { return delegate->getMetadataStorage(); }
std::unordered_map<String, String> getSerializedMetadata(const std::vector<String> & file_paths) const override { return delegate->getSerializedMetadata(file_paths); }

View File

@ -83,7 +83,7 @@ public:
delegate->moveDirectory(wrapped_from_path, wrapped_to_path);
}
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override
DirectoryIteratorPtr iterateDirectory(const String & path) override
{
auto wrapped_path = wrappedPath(path);
return delegate->iterateDirectory(wrapped_path);

View File

@ -173,7 +173,7 @@ private:
};
class DiskLocalDirectoryIterator final : public IDiskDirectoryIterator
class DiskLocalDirectoryIterator final : public IDirectoryIterator
{
public:
DiskLocalDirectoryIterator() = default;
@ -325,7 +325,7 @@ void DiskLocal::moveDirectory(const String & from_path, const String & to_path)
fs::rename(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path);
}
DiskDirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path)
DirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path)
{
fs::path meta_path = fs::path(disk_path) / path;
if (!broken && fs::exists(meta_path) && fs::is_directory(meta_path))

View File

@ -58,7 +58,7 @@ public:
void moveDirectory(const String & from_path, const String & to_path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
DirectoryIteratorPtr iterateDirectory(const String & path) override;
void createFile(const String & path) override;

View File

@ -20,7 +20,7 @@ namespace ErrorCodes
}
class DiskMemoryDirectoryIterator final : public IDiskDirectoryIterator
class DiskMemoryDirectoryIterator final : public IDirectoryIterator
{
public:
explicit DiskMemoryDirectoryIterator(std::vector<fs::path> && dir_file_paths_)
@ -262,7 +262,7 @@ void DiskMemory::moveDirectory(const String & /*from_path*/, const String & /*to
throw Exception("Method moveDirectory is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED);
}
DiskDirectoryIteratorPtr DiskMemory::iterateDirectory(const String & path)
DirectoryIteratorPtr DiskMemory::iterateDirectory(const String & path)
{
std::lock_guard lock(mutex);

View File

@ -52,7 +52,7 @@ public:
void moveDirectory(const String & from_path, const String & to_path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
DirectoryIteratorPtr iterateDirectory(const String & path) override;
void createFile(const String & path) override;

View File

@ -171,7 +171,7 @@ void DiskRestartProxy::moveDirectory(const String & from_path, const String & to
DiskDecorator::moveDirectory(from_path, to_path);
}
DiskDirectoryIteratorPtr DiskRestartProxy::iterateDirectory(const String & path)
DirectoryIteratorPtr DiskRestartProxy::iterateDirectory(const String & path)
{
ReadLock lock (mutex);
return DiskDecorator::iterateDirectory(path);

View File

@ -37,7 +37,7 @@ public:
void createDirectories(const String & path) override;
void clearDirectory(const String & path) override;
void moveDirectory(const String & from_path, const String & to_path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
DirectoryIteratorPtr iterateDirectory(const String & path) override;
void createFile(const String & path) override;
void moveFile(const String & from_path, const String & to_path) override;
void replaceFile(const String & from_path, const String & to_path) override;

View File

@ -95,7 +95,7 @@ void DiskWebServer::initialize(const String & uri_path) const
}
class DiskWebServerDirectoryIterator final : public IDiskDirectoryIterator
class DiskWebServerDirectoryIterator final : public IDirectoryIterator
{
public:
explicit DiskWebServerDirectoryIterator(std::vector<fs::path> && dir_file_paths_)
@ -188,7 +188,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskWebServer::readFile(const String & p
}
DiskDirectoryIteratorPtr DiskWebServer::iterateDirectory(const String & path)
DirectoryIteratorPtr DiskWebServer::iterateDirectory(const String & path)
{
std::vector<fs::path> dir_file_paths;
if (files.find(path) == files.end())

View File

@ -96,7 +96,7 @@ public:
bool isDirectory(const String & path) const override;
DiskDirectoryIteratorPtr iterateDirectory(const String & /* path */) override;
DirectoryIteratorPtr iterateDirectory(const String & /* path */) override;
Poco::Timestamp getLastModified(const String &) override { return Poco::Timestamp{}; }

View File

@ -6,6 +6,7 @@
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
namespace DB
{
@ -99,4 +100,6 @@ SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const
return nullptr;
}
MetadataStoragePtr IDisk::getMetadataStorage() { return std::make_shared<MetadataStorageFromDisk>(std::static_pointer_cast<IDisk>(shared_from_this())); }
}

View File

@ -12,6 +12,7 @@
#include <IO/WriteSettings.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/WriteMode.h>
#include <Disks/DirectoryIterator.h>
#include <memory>
#include <mutex>
@ -39,9 +40,6 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
class IDiskDirectoryIterator;
using DiskDirectoryIteratorPtr = std::unique_ptr<IDiskDirectoryIterator>;
class IReservation;
using ReservationPtr = std::unique_ptr<IReservation>;
using Reservations = std::vector<ReservationPtr>;
@ -49,6 +47,8 @@ using Reservations = std::vector<ReservationPtr>;
class ReadBufferFromFileBase;
class WriteBufferFromFileBase;
class MMappedFileCache;
class IMetadataStorage;
using MetadataStoragePtr = std::shared_ptr<IMetadataStorage>;
/**
@ -92,7 +92,10 @@ class IDisk : public Space
{
public:
/// Default constructor.
explicit IDisk(std::unique_ptr<Executor> executor_ = std::make_unique<SyncExecutor>()) : executor(std::move(executor_)) { }
explicit IDisk(std::unique_ptr<Executor> executor_ = std::make_unique<SyncExecutor>())
: executor(std::move(executor_))
{
}
/// Root path for all files stored on the disk.
/// It's not required to be a local filesystem path.
@ -135,7 +138,7 @@ public:
virtual void moveDirectory(const String & from_path, const String & to_path) = 0;
/// Return iterator to the contents of the specified directory.
virtual DiskDirectoryIteratorPtr iterateDirectory(const String & path) = 0;
virtual DirectoryIteratorPtr iterateDirectory(const String & path) = 0;
/// Return `true` if the specified directory is empty.
bool isDirectoryEmpty(const String & path);
@ -317,7 +320,7 @@ public:
/// Actually it's a part of IDiskRemote implementation but we have so
/// complex hierarchy of disks (with decorators), so we cannot even
/// dynamic_cast some pointer to IDisk to pointer to IDiskRemote.
virtual std::shared_ptr<IDisk> getMetadataDiskIfExistsOrSelf() { return std::static_pointer_cast<IDisk>(shared_from_this()); }
virtual MetadataStoragePtr getMetadataStorage();
/// Very similar case as for getMetadataDiskIfExistsOrSelf(). If disk has "metadata"
/// it will return mapping for each required path: path -> metadata as string.
@ -364,27 +367,6 @@ private:
using DiskPtr = std::shared_ptr<IDisk>;
using Disks = std::vector<DiskPtr>;
/**
* Iterator of directory contents on particular disk.
*/
class IDiskDirectoryIterator
{
public:
/// Iterate to the next file.
virtual void next() = 0;
/// Return `true` if the iterator points to a valid element.
virtual bool isValid() const = 0;
/// Path to the file that the iterator currently points to.
virtual String path() const = 0;
/// Name of the file that the iterator currently points to.
virtual String name() const = 0;
virtual ~IDiskDirectoryIterator() = default;
};
/**
* Information about reserved size on particular disk.
*/

View File

@ -11,6 +11,7 @@
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureObjectStorage.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
namespace DB
{
@ -82,11 +83,13 @@ void registerDiskAzureBlobStorage(DiskFactory & factory)
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk);
std::shared_ptr<IDisk> azure_blob_storage_disk = std::make_shared<DiskObjectStorage>(
name,
/* no namespaces */"",
"DiskAzureBlobStorage",
metadata_disk,
std::move(metadata_storage),
std::move(azure_object_storage),
DiskType::AzureBlobStorage,
send_metadata,

View File

@ -89,7 +89,7 @@ DiskObjectStorage::DiskObjectStorage(
const String & name_,
const String & remote_fs_root_path_,
const String & log_name,
DiskPtr metadata_disk_,
MetadataStoragePtr && metadata_storage_,
ObjectStoragePtr && object_storage_,
DiskType disk_type_,
bool send_metadata_,
@ -98,8 +98,8 @@ DiskObjectStorage::DiskObjectStorage(
, name(name_)
, remote_fs_root_path(remote_fs_root_path_)
, log (&Poco::Logger::get(log_name))
, metadata_disk(metadata_disk_)
, disk_type(disk_type_)
, metadata_storage(std::move(metadata_storage_))
, object_storage(std::move(object_storage_))
, send_metadata(send_metadata_)
, metadata_helper(std::make_unique<DiskObjectStorageMetadataHelper>(this, ReadSettings{}))
@ -107,7 +107,7 @@ DiskObjectStorage::DiskObjectStorage(
DiskObjectStorage::Metadata DiskObjectStorage::readMetadataUnlocked(const String & path, std::shared_lock<std::shared_mutex> &) const
{
return Metadata::readMetadata(remote_fs_root_path, metadata_disk, path);
return Metadata::readMetadata(remote_fs_root_path, metadata_storage, path);
}
@ -120,37 +120,37 @@ DiskObjectStorage::Metadata DiskObjectStorage::readMetadata(const String & path)
DiskObjectStorage::Metadata DiskObjectStorage::readUpdateAndStoreMetadata(const String & path, bool sync, DiskObjectStorage::MetadataUpdater updater)
{
std::unique_lock lock(metadata_mutex);
return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_storage, path, sync, updater);
}
void DiskObjectStorage::readUpdateStoreMetadataAndRemove(const String & path, bool sync, DiskObjectStorage::MetadataUpdater updater)
{
std::unique_lock lock(metadata_mutex);
Metadata::readUpdateStoreMetadataAndRemove(remote_fs_root_path, metadata_disk, path, sync, updater);
Metadata::readUpdateStoreMetadataAndRemove(remote_fs_root_path, metadata_storage, path, sync, updater);
}
DiskObjectStorage::Metadata DiskObjectStorage::readOrCreateUpdateAndStoreMetadata(const String & path, WriteMode mode, bool sync, DiskObjectStorage::MetadataUpdater updater)
{
if (mode == WriteMode::Rewrite || !metadata_disk->exists(path))
if (mode == WriteMode::Rewrite || !metadata_storage->exists(path))
{
std::unique_lock lock(metadata_mutex);
return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_storage, path, sync, updater);
}
else
{
return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_storage, path, sync, updater);
}
}
DiskObjectStorage::Metadata DiskObjectStorage::createAndStoreMetadata(const String & path, bool sync)
{
return Metadata::createAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync);
return Metadata::createAndStoreMetadata(remote_fs_root_path, metadata_storage, path, sync);
}
DiskObjectStorage::Metadata DiskObjectStorage::createUpdateAndStoreMetadata(const String & path, bool sync, DiskObjectStorage::MetadataUpdater updater)
{
return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater);
return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_storage, path, sync, updater);
}
std::vector<String> DiskObjectStorage::getRemotePaths(const String & local_path) const
@ -168,7 +168,7 @@ std::vector<String> DiskObjectStorage::getRemotePaths(const String & local_path)
void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::vector<LocalPathWithRemotePaths> & paths_map)
{
/// Protect against concurrent delition of files (for example because of a merge).
if (metadata_disk->isFile(local_path))
if (metadata_storage->isFile(local_path))
{
try
{
@ -188,7 +188,7 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::
}
else
{
DiskDirectoryIteratorPtr it;
DirectoryIteratorPtr it;
try
{
it = iterateDirectory(local_path);
@ -216,13 +216,13 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::
bool DiskObjectStorage::exists(const String & path) const
{
return metadata_disk->exists(path);
return metadata_storage->exists(path);
}
bool DiskObjectStorage::isFile(const String & path) const
{
return metadata_disk->isFile(path);
return metadata_storage->isFile(path);
}
@ -255,7 +255,9 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
{
std::unique_lock lock(metadata_mutex);
metadata_disk->moveFile(from_path, to_path);
auto tx = metadata_storage->createTransaction();
metadata_storage->moveFile(from_path, to_path, tx);
tx->commit();
}
}
@ -351,7 +353,9 @@ void DiskObjectStorage::createHardLink(const String & src_path, const String & d
}
/// Create FS hardlink to metadata file.
metadata_disk->createHardLink(src_path, dst_path);
auto tx = metadata_storage->createTransaction();
metadata_storage->createHardLink(src_path, dst_path, tx);
tx->commit();
}
void DiskObjectStorage::createHardLink(const String & src_path, const String & dst_path)
@ -370,19 +374,23 @@ void DiskObjectStorage::setReadOnly(const String & path)
bool DiskObjectStorage::isDirectory(const String & path) const
{
return metadata_disk->isDirectory(path);
return metadata_storage->isDirectory(path);
}
void DiskObjectStorage::createDirectory(const String & path)
{
metadata_disk->createDirectory(path);
auto tx = metadata_storage->createTransaction();
metadata_storage->createDirectory(path, tx);
tx->commit();
}
void DiskObjectStorage::createDirectories(const String & path)
{
metadata_disk->createDirectories(path);
auto tx = metadata_storage->createTransaction();
metadata_storage->createDicrectoryRecursive(path, tx);
tx->commit();
}
@ -396,13 +404,15 @@ void DiskObjectStorage::clearDirectory(const String & path)
void DiskObjectStorage::removeDirectory(const String & path)
{
metadata_disk->removeDirectory(path);
auto tx = metadata_storage->createTransaction();
metadata_storage->removeDirectory(path, tx);
tx->commit();
}
DiskDirectoryIteratorPtr DiskObjectStorage::iterateDirectory(const String & path)
DirectoryIteratorPtr DiskObjectStorage::iterateDirectory(const String & path)
{
return metadata_disk->iterateDirectory(path);
return metadata_storage->iterateDirectory(path);
}
@ -415,23 +425,25 @@ void DiskObjectStorage::listFiles(const String & path, std::vector<String> & fil
void DiskObjectStorage::setLastModified(const String & path, const Poco::Timestamp & timestamp)
{
metadata_disk->setLastModified(path, timestamp);
auto tx = metadata_storage->createTransaction();
metadata_storage->setLastModified(path, timestamp, tx);
tx->commit();
}
Poco::Timestamp DiskObjectStorage::getLastModified(const String & path)
{
return metadata_disk->getLastModified(path);
return metadata_storage->getLastModified(path);
}
void DiskObjectStorage::removeMetadata(const String & path, std::vector<String> & paths_to_remove)
{
LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_disk->getPath() + path));
LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_storage->getPath() + path));
if (!metadata_disk->exists(path))
if (!metadata_storage->exists(path))
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Metadata path '{}' doesn't exist", path);
if (!metadata_disk->isFile(path))
if (!metadata_storage->isFile(path))
throw Exception(ErrorCodes::BAD_FILE_TYPE, "Path '{}' is not a regular file", path);
try
@ -470,7 +482,9 @@ void DiskObjectStorage::removeMetadata(const String & path, std::vector<String>
backQuote(path), e.nested() ? e.nested()->message() : e.message());
std::unique_lock lock(metadata_mutex);
metadata_disk->removeFile(path);
auto tx = metadata_storage->createTransaction();
metadata_storage->unlinkFile(path, tx);
tx->commit();
}
else
throw;
@ -482,7 +496,7 @@ void DiskObjectStorage::removeMetadataRecursive(const String & path, std::unorde
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
if (metadata_disk->isFile(path))
if (metadata_storage->isFile(path))
{
removeMetadata(path, paths_to_remove[path]);
}
@ -491,7 +505,9 @@ void DiskObjectStorage::removeMetadataRecursive(const String & path, std::unorde
for (auto it = iterateDirectory(path); it->isValid(); it->next())
removeMetadataRecursive(it->path(), paths_to_remove);
metadata_disk->removeDirectory(path);
auto tx = metadata_storage->createTransaction();
metadata_storage->removeDirectory(path, tx);
tx->commit();
}
}
@ -525,7 +541,7 @@ ReservationPtr DiskObjectStorage::reserve(UInt64 bytes)
void DiskObjectStorage::removeSharedFileIfExists(const String & path, bool delete_metadata_only)
{
std::vector<String> paths_to_remove;
if (metadata_disk->exists(path))
if (metadata_storage->exists(path))
{
removeMetadata(path, paths_to_remove);
if (!delete_metadata_only)

View File

@ -4,6 +4,7 @@
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorageMetadataHelper.h>
#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <re2/re2.h>
namespace CurrentMetrics
@ -30,7 +31,7 @@ public:
const String & name_,
const String & remote_fs_root_path_,
const String & log_name,
DiskPtr metadata_disk_,
MetadataStoragePtr && mestata_storage_,
ObjectStoragePtr && object_storage_,
DiskType disk_type_,
bool send_metadata_,
@ -47,7 +48,7 @@ public:
const String & getName() const override { return name; }
const String & getPath() const override { return metadata_disk->getPath(); }
const String & getPath() const override { return metadata_storage->getPath(); }
std::vector<String> getRemotePaths(const String & local_path) const override;
@ -108,7 +109,7 @@ public:
void removeFromRemoteFS(const std::vector<String> & paths);
DiskPtr getMetadataDiskIfExistsOrSelf() override { return metadata_disk; }
MetadataStoragePtr getMetadataStorage() override { return metadata_storage; }
UInt32 getRefCount(const String & path) const override;
@ -141,7 +142,7 @@ public:
void removeDirectory(const String & path) override;
DiskDirectoryIteratorPtr iterateDirectory(const String & path) override;
DirectoryIteratorPtr iterateDirectory(const String & path) override;
void setLastModified(const String & path, const Poco::Timestamp & timestamp) override;
@ -180,9 +181,9 @@ private:
const String name;
const String remote_fs_root_path;
Poco::Logger * log;
DiskPtr metadata_disk;
const DiskType disk_type;
MetadataStoragePtr metadata_storage;
ObjectStoragePtr object_storage;
UInt64 reserved_bytes = 0;

View File

@ -16,51 +16,53 @@ namespace ErrorCodes
extern const int CANNOT_OPEN_FILE;
}
DiskObjectStorageMetadata DiskObjectStorageMetadata::readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_)
DiskObjectStorageMetadata DiskObjectStorageMetadata::readMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_)
{
DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_storage_, metadata_file_path_);
result.load();
return result;
}
DiskObjectStorageMetadata DiskObjectStorageMetadata::createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync)
DiskObjectStorageMetadata DiskObjectStorageMetadata::createAndStoreMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync)
{
DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_storage_, metadata_file_path_);
result.save(sync);
return result;
}
DiskObjectStorageMetadata DiskObjectStorageMetadata::readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater)
DiskObjectStorageMetadata DiskObjectStorageMetadata::readUpdateAndStoreMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater)
{
DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_storage_, metadata_file_path_);
result.load();
if (updater(result))
result.save(sync);
return result;
}
DiskObjectStorageMetadata DiskObjectStorageMetadata::createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater)
DiskObjectStorageMetadata DiskObjectStorageMetadata::createUpdateAndStoreMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater)
{
DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_storage_, metadata_file_path_);
updater(result);
result.save(sync);
return result;
}
void DiskObjectStorageMetadata::readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater)
void DiskObjectStorageMetadata::readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater)
{
/// Very often we are deleting metadata from some unfinished operation (like fetch of metadata)
/// in this case metadata file can be incomplete/empty and so on. It's ok to remove it in this case
/// because we cannot do anything better.
try
{
DiskObjectStorageMetadata metadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
DiskObjectStorageMetadata metadata(remote_fs_root_path_, metadata_storage_, metadata_file_path_);
metadata.load();
if (updater(metadata))
metadata.save(sync);
metadata_disk_->removeFile(metadata_file_path_);
auto tx = metadata_storage_->createTransaction();
metadata_storage_->unlinkFile(metadata_file_path_, tx);
tx->commit();
}
catch (Exception & ex)
{
@ -71,7 +73,10 @@ void DiskObjectStorageMetadata::readUpdateStoreMetadataAndRemove(const String &
{
LOG_INFO(&Poco::Logger::get("ObjectStorageMetadata"), "Failed to read metadata file {} before removal because it's incomplete or empty. "
"It's Ok and can happen after operation interruption (like metadata fetch), so removing as is", metadata_file_path_);
metadata_disk_->removeFile(metadata_file_path_);
auto tx = metadata_storage_->createTransaction();
metadata_storage_->unlinkFile(metadata_file_path_, tx);
tx->commit();
}
/// If file already removed, than nothing to do
@ -82,15 +87,15 @@ void DiskObjectStorageMetadata::readUpdateStoreMetadataAndRemove(const String &
}
}
DiskObjectStorageMetadata DiskObjectStorageMetadata::createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite)
DiskObjectStorageMetadata DiskObjectStorageMetadata::createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, bool overwrite)
{
if (overwrite || !metadata_disk_->exists(metadata_file_path_))
if (overwrite || !metadata_storage_->exists(metadata_file_path_))
{
return createAndStoreMetadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_, sync);
return createAndStoreMetadata(remote_fs_root_path_, metadata_storage_, metadata_file_path_, sync);
}
else
{
auto result = readMetadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
auto result = readMetadata(remote_fs_root_path_, metadata_storage_, metadata_file_path_);
if (result.read_only)
throw Exception("File is read-only: " + metadata_file_path_, ErrorCodes::PATH_ACCESS_DENIED);
return result;
@ -99,8 +104,7 @@ DiskObjectStorageMetadata DiskObjectStorageMetadata::createAndStoreMetadataIfNot
void DiskObjectStorageMetadata::load()
{
const ReadSettings read_settings;
auto buf = metadata_disk->readFile(metadata_file_path, read_settings, 1024); /* reasonable buffer size for small file */
auto buf = metadata_storage->readFile(metadata_file_path, ReadSettings{}, 1024); /* reasonable buffer size for small file */
UInt32 version;
readIntText(version, *buf);
@ -109,7 +113,7 @@ void DiskObjectStorageMetadata::load()
throw Exception(
ErrorCodes::UNKNOWN_FORMAT,
"Unknown metadata file version. Path: {}. Version: {}. Maximum expected version: {}",
metadata_disk->getPath() + metadata_file_path, toString(version), toString(VERSION_READ_ONLY_FLAG));
metadata_storage->getPath() + metadata_file_path, toString(version), toString(VERSION_READ_ONLY_FLAG));
assertChar('\n', *buf);
@ -132,7 +136,7 @@ void DiskObjectStorageMetadata::load()
if (!remote_fs_object_path.starts_with(remote_fs_root_path))
throw Exception(ErrorCodes::UNKNOWN_FORMAT,
"Path in metadata does not correspond to root path. Path: {}, root path: {}, disk path: {}",
remote_fs_object_path, remote_fs_root_path, metadata_disk->getPath());
remote_fs_object_path, remote_fs_root_path, metadata_storage->getPath());
remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size());
}
@ -154,11 +158,11 @@ void DiskObjectStorageMetadata::load()
/// Load metadata by path or create empty if `create` flag is set.
DiskObjectStorageMetadata::DiskObjectStorageMetadata(
const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
MetadataStoragePtr metadata_storage_,
const String & metadata_file_path_)
: remote_fs_root_path(remote_fs_root_path_)
, metadata_file_path(metadata_file_path_)
, metadata_disk(metadata_disk_)
, metadata_storage(metadata_storage_)
, total_size(0), ref_count(0)
{
}
@ -203,8 +207,10 @@ void DiskObjectStorageMetadata::saveToBuffer(WriteBuffer & buf, bool sync)
/// Fsync metadata file if 'sync' flag is set.
void DiskObjectStorageMetadata::save(bool sync)
{
auto buf = metadata_disk->writeFile(metadata_file_path, 1024);
auto tx = metadata_storage->createTransaction();
auto buf = metadata_storage->writeFile(metadata_file_path, tx, 1024);
saveToBuffer(*buf, sync);
tx->commit();
}
std::string DiskObjectStorageMetadata::serializeToString()

View File

@ -1,6 +1,7 @@
#pragma once
#include <Disks/IDisk.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Core/Types.h>
namespace DB
@ -24,7 +25,7 @@ struct DiskObjectStorageMetadata
/// Relative path to metadata file on local FS.
const String metadata_file_path;
DiskPtr metadata_disk;
MetadataStoragePtr metadata_storage;
/// Total size of all remote FS (S3, HDFS) objects.
size_t total_size = 0;
@ -40,18 +41,18 @@ struct DiskObjectStorageMetadata
DiskObjectStorageMetadata(
const String & remote_fs_root_path_,
DiskPtr metadata_disk_,
MetadataStoragePtr metadata_storage_,
const String & metadata_file_path_);
void addObject(const String & path, size_t size);
static DiskObjectStorageMetadata readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_);
static DiskObjectStorageMetadata readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static void readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static DiskObjectStorageMetadata readMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_);
static DiskObjectStorageMetadata readUpdateAndStoreMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, Updater updater);
static void readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, Updater updater);
static DiskObjectStorageMetadata createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync);
static DiskObjectStorageMetadata createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static DiskObjectStorageMetadata createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite);
static DiskObjectStorageMetadata createAndStoreMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync);
static DiskObjectStorageMetadata createUpdateAndStoreMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, Updater updater);
static DiskObjectStorageMetadata createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, bool overwrite);
/// Serialize metadata to string (very same with saveToBuffer)
std::string serializeToString();

View File

@ -81,7 +81,7 @@ void DiskObjectStorageMetadataHelper::updateObjectMetadata(const String & key, c
void DiskObjectStorageMetadataHelper::migrateFileToRestorableSchema(const String & path) const
{
LOG_TRACE(disk->log, "Migrate file {} to restorable schema", disk->metadata_disk->getPath() + path);
LOG_TRACE(disk->log, "Migrate file {} to restorable schema", disk->metadata_storage->getPath() + path);
auto meta = disk->readMetadata(path);
@ -97,7 +97,7 @@ void DiskObjectStorageMetadataHelper::migrateToRestorableSchemaRecursive(const S
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
LOG_TRACE(disk->log, "Migrate directory {} to restorable schema", disk->metadata_disk->getPath() + path);
LOG_TRACE(disk->log, "Migrate directory {} to restorable schema", disk->metadata_storage->getPath() + path);
bool dir_contains_only_files = true;
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
@ -223,7 +223,9 @@ void DiskObjectStorageMetadataHelper::restore(const Poco::Util::AbstractConfigur
restoreFiles(source_object_storage, information);
restoreFileOperations(source_object_storage, information);
disk->metadata_disk->removeFile(RESTORE_FILE_NAME);
auto tx = disk->metadata_storage->createTransaction();
disk->metadata_storage->unlinkFile(RESTORE_FILE_NAME, tx);
tx->commit();
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
@ -239,7 +241,7 @@ void DiskObjectStorageMetadataHelper::restore(const Poco::Util::AbstractConfigur
void DiskObjectStorageMetadataHelper::readRestoreInformation(RestoreInformation & restore_information) /// NOLINT
{
auto buffer = disk->metadata_disk->readFile(RESTORE_FILE_NAME, ReadSettings{}, 512);
auto buffer = disk->metadata_storage->readFile(RESTORE_FILE_NAME, ReadSettings{}, 512);
buffer->next();
try
@ -438,9 +440,11 @@ void DiskObjectStorageMetadataHelper::processRestoreFiles(IObjectStorage * sourc
void DiskObjectStorage::onFreeze(const String & path)
{
createDirectories(path);
auto revision_file_buf = metadata_disk->writeFile(path + "revision.txt", 32);
auto tx = metadata_storage->createTransaction();
auto revision_file_buf = metadata_storage->writeFile(path + "revision.txt", tx, 32);
writeIntText(metadata_helper->revision_counter.load(), *revision_file_buf);
revision_file_buf->finalize();
tx->commit();
}
static String pathToDetached(const String & source_path)
@ -531,6 +535,7 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * sou
{
Strings not_finished_prefixes{"tmp_", "delete_tmp_", "attaching_", "deleting_"};
auto tx = disk->metadata_storage->createTransaction();
for (const auto & path : renames)
{
/// Skip already detached parts.
@ -557,12 +562,13 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * sou
to_path /= from_path.filename();
/// to_path may exist and non-empty in case for example abrupt restart, so remove it before rename
if (disk->metadata_disk->exists(to_path))
disk->metadata_disk->removeRecursive(to_path);
if (disk->metadata_storage->exists(to_path))
disk->metadata_storage->removeRecursive(to_path, tx);
disk->createDirectories(directoryPath(to_path));
disk->metadata_disk->moveDirectory(from_path, to_path);
disk->metadata_storage->moveDirectory(from_path, to_path, tx);
}
tx->commit();
}
LOG_INFO(disk->log, "File operations restored for disk {}", disk->name);

View File

@ -1,6 +1,7 @@
#include <Disks/ObjectStorages/HDFS/HDFSObjectStorage.h>
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <Disks/DiskFactory.h>
#include <Storages/HDFS/HDFSCommon.h>
@ -31,17 +32,20 @@ void registerDiskHDFS(DiskFactory & factory)
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
context_->getSettingsRef().hdfs_replication
);
/// FIXME Cache currently unsupported :(
ObjectStoragePtr hdfs_storage = std::make_unique<HDFSObjectStorage>(nullptr, uri, std::move(settings), config);
auto metadata_disk = prepareForLocalMetadata(name, config, config_prefix, context_).second;
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk);
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
return std::make_shared<DiskObjectStorage>(
name,
uri,
"DiskHDFS",
metadata_disk,
std::move(metadata_storage),
std::move(hdfs_storage),
DiskType::HDFS,
/* send_metadata = */ false,

View File

@ -1,5 +1,5 @@
#pragma once
=
#include <memory>
#include <vector>
#include <unordered_map>
@ -8,6 +8,7 @@
#include <IO/WriteSettings.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <Disks/DirectoryIterator.h>
#include <Disks/WriteMode.h>
namespace DB
@ -17,6 +18,7 @@ struct IMetadataOperation
{
virtual void execute() = 0;
virtual void undo() = 0;
virtual void finalize() {}
virtual ~IMetadataOperation() = default;
};
@ -38,13 +40,17 @@ class IMetadataStorage : private boost::noncopyable
{
public:
MetadataTransactionPtr createTransaction() const;
virtual MetadataTransactionPtr createTransaction() const = 0;
virtual const std::string & getPath() const = 0;
virtual bool exists(const std::string & path) const = 0;
virtual bool isFile(const std::string & path) const = 0;
virtual bool isDirectory(const std::string & path) const = 0;
virtual Poco::Timestamp getLastModified(const std::string & path) const = 0;
virtual std::vector<std::string> listDirectory(const std::string & path) const = 0;
virtual DirectoryIteratorPtr iterateDirectory(const String & path) = 0;
virtual std::unique_ptr<ReadBufferFromFileBase> readFile( /// NOLINT
const std::string & path,
@ -68,15 +74,19 @@ public:
virtual void removeDirectory(const std::string & path, MetadataTransactionPtr transaction) = 0;
virtual void createHardlink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0;
virtual void removeRecursive(const std::string & path, MetadataTransactionPtr transaction) = 0;
virtual void createHardLink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0;
virtual void moveFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0;
virtual void moveDirectory(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0;
virtual void replaceFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0;
virtual ~IMetadataStorage() = default;
};
using MetadataStoragePtr = std::shared_ptr<IMetadataStorage>;
}

View File

@ -1,292 +0,0 @@
#include <Disks/ObjectStorages/LocalDiskMetadataStorage.h>
#include <ranges>
#include <filesystem>
namespace DB
{
namespace
{
class SetLastModifiedOperation final : public IMetadataOperation
{
std::string path;
Poco::Timestamp new_timestamp;
Poco::Timestamp old_timestamp;
IDisk & disk;
public:
SetLastModifiedOperation(const std::string & path_, Poco::Timestamp new_timestamp_, IDisk & disk_)
: path(path_)
, new_timestamp(new_timestamp_)
, disk(disk_)
{}
void execute() override
{
old_timestamp = disk.getLastModified(path);
disk.setLastModified(path, new_timestamp);
}
void undo() override
{
disk.setLastModified(path, old_timestamp);
}
};
class UnlinkFileOperation final : public IMetadataOperation
{
std::string path;
IDisk & disk;
public:
UnlinkFileOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void execute() override
{
disk.removeFile(path);
}
void undo() override
{
/// TODO Do something with it
}
};
class CreateDirectoryOperation final : public IMetadataOperation
{
private:
std::string path;
IDisk & disk;
public:
CreateDirectoryOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void execute() override
{
disk.createDirectory(path);
}
void undo() override
{
disk.removeDirectory(path);
}
};
class CreateDirectoryRecursiveOperation : public IMetadataOperation
{
private:
std::string path;
std::vector<std::string> paths_created;
IDisk & disk;
public:
CreateDirectoryRecursiveOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void execute() override
{
namespace fs = std::filesystem;
fs::path p(path);
while (!disk.exists(p))
{
disk.createDirectory(p);
paths_created.push_back(p);
if (!p.has_parent_path())
break;
p = p.parent_path();
}
}
void undo() override
{
for (const auto & path_created : paths_created)
disk.removeDirectory(path_created);
}
};
class RemoveDirectoryOperation : public IMetadataOperation
{
private:
std::string path;
IDisk & disk;
public:
RemoveDirectoryOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{}
void execute() override
{
disk.removeDirectory(path);
}
void undo() override
{
disk.createDirectory(path);
}
};
class CreateHardlinkOperation : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
CreateHardlinkOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.createHardLink(path_from, path_to);
}
void undo() override
{
disk.removeFile(path_to);
}
};
class MoveFileOperation : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
MoveFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.moveFile(path_from, path_to);
}
void undo() override
{
disk.moveFile(path_to, path_from);
}
};
class ReplaceFileOperation : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
ReplaceFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.replaceFile(path_from, path_to);
}
void undo() override
{
/// TODO something with this
}
};
class WriteFileOperation: public IMetadataOperation
{
private:
std::string path;
std::string temp_path;
IDisk & disk;
public:
WriteFileOperation(const std::string & path_, const std::string & temp_path_, IDisk & disk_)
: path(path_)
, temp_path(temp_path_)
, disk(disk_)
{}
void execute() override
{
disk.moveFile(temp_path, path);
}
void undo() override
{
disk.removeFileIfExists(path);
disk.removeFileIfExists(temp_path);
}
};
}
std::unique_ptr<WriteBufferFromFileBase> LocalDiskMetadataStorage::writeFile( /// NOLINT
const std::string & path,
MetadataTransactionPtr transaction,
size_t buf_size,
const WriteSettings & settings)
{
std::string temp_path = path + "_tmp";
transaction->addOperation(std::make_unique<WriteFileOperation>(path, temp_path, *local_disk));
return local_disk->writeFile(temp_path, buf_size, WriteMode::Rewrite, settings);
}
void LocalDiskMetadataStorage::setLastModified(const std::string & path, const Poco::Timestamp & timestamp, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<SetLastModifiedOperation>(path, timestamp, *local_disk));
}
void LocalDiskMetadataStorage::unlinkFile(const std::string & path, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<UnlinkFileOperation>(path, *local_disk));
}
void LocalDiskMetadataStorage::createDirectory(const std::string & path, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<CreateDirectoryOperation>(path, *local_disk));
}
void LocalDiskMetadataStorage::createDicrectoryRecursive(const std::string & path, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<CreateDirectoryRecursiveOperation>(path, *local_disk));
}
void LocalDiskMetadataStorage::removeDirectory(const std::string & path, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<RemoveDirectoryOperation>(path, *local_disk));
}
void LocalDiskMetadataStorage::createHardlink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<CreateHardlinkOperation>(path_from, path_to, *local_disk));
}
void LocalDiskMetadataStorage::moveFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<MoveFileOperation>(path_from, path_to, *local_disk));
}
void LocalDiskMetadataStorage::replaceFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<MoveFileOperation>(path_from, path_to, *local_disk));
}
}

View File

@ -0,0 +1,558 @@
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <ranges>
#include <filesystem>
#include <Disks/TemporaryFileOnDisk.h>
#include <Poco/TemporaryFile.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FS_METADATA_ERROR;
}
std::string toString(MetadataFromDiskTransactionState state)
{
switch (state)
{
case MetadataFromDiskTransactionState::PREPARING:
return "PREPARING";
case MetadataFromDiskTransactionState::FAILED:
return "FAILED";
case MetadataFromDiskTransactionState::COMMITTED:
return "COMMITTED";
case MetadataFromDiskTransactionState::ROLLED_BACK:
return "ROLLED_BACK";
case MetadataFromDiskTransactionState::PARTIALLY_ROLLED_BACK:
return "PARTIALLY_ROLLED_BACK";
}
__builtin_unreachable();
}
namespace
{
std::string getTempFileName()
{
std::string temp_filepath;
std::string dummy_prefix = "a/";
temp_filepath = Poco::TemporaryFile::tempName(dummy_prefix);
dummy_prefix += "tmp";
assert(temp_filepath.starts_with(dummy_prefix));
temp_filepath.replace(0, dummy_prefix.length(), "tmp");
return temp_filepath;
}
class SetLastModifiedOperation final : public IMetadataOperation
{
std::string path;
Poco::Timestamp new_timestamp;
Poco::Timestamp old_timestamp;
IDisk & disk;
public:
SetLastModifiedOperation(const std::string & path_, Poco::Timestamp new_timestamp_, IDisk & disk_)
: path(path_)
, new_timestamp(new_timestamp_)
, disk(disk_)
{}
void execute() override
{
old_timestamp = disk.getLastModified(path);
disk.setLastModified(path, new_timestamp);
}
void undo() override
{
disk.setLastModified(path, old_timestamp);
}
};
class UnlinkFileOperation final : public IMetadataOperation
{
std::string path;
IDisk & disk;
std::string temp_filepath;
public:
UnlinkFileOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
, temp_filepath(getTempFileName())
{
}
void execute() override
{
disk.moveFile(path, temp_filepath);
}
void undo() override
{
disk.moveFile(temp_filepath, path);
}
void finalize() override
{
disk.removeFileIfExists(temp_filepath);
}
};
class CreateDirectoryOperation final : public IMetadataOperation
{
private:
std::string path;
IDisk & disk;
public:
CreateDirectoryOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void execute() override
{
disk.createDirectory(path);
}
void undo() override
{
disk.removeDirectory(path);
}
};
class CreateDirectoryRecursiveOperation : public IMetadataOperation
{
private:
std::string path;
std::vector<std::string> paths_created;
IDisk & disk;
public:
CreateDirectoryRecursiveOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{
}
void execute() override
{
namespace fs = std::filesystem;
fs::path p(path);
while (!disk.exists(p))
{
disk.createDirectory(p);
paths_created.push_back(p);
if (!p.has_parent_path())
break;
p = p.parent_path();
}
}
void undo() override
{
for (const auto & path_created : paths_created)
disk.removeDirectory(path_created);
}
};
class RemoveDirectoryOperation : public IMetadataOperation
{
private:
std::string path;
IDisk & disk;
public:
RemoveDirectoryOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
{}
void execute() override
{
disk.removeDirectory(path);
}
void undo() override
{
disk.createDirectory(path);
}
};
class RemoveRecursiveOperation : public IMetadataOperation
{
std::string path;
IDisk & disk;
std::string temp_path;
public:
RemoveRecursiveOperation(const std::string & path_, IDisk & disk_)
: path(path_)
, disk(disk_)
, temp_path(getTempFileName())
{
}
void execute() override
{
if (disk.isFile(path))
disk.moveFile(path, temp_path);
if (disk.isDirectory(path))
disk.moveDirectory(path, temp_path);
}
void undo() override
{
if (disk.isFile(temp_path))
disk.moveFile(temp_path, path);
if (disk.isDirectory(temp_path))
disk.moveDirectory(temp_path, path);
}
void finalize() override
{
if (disk.exists(temp_path))
disk.removeRecursive(temp_path);
if (disk.exists(path))
disk.removeRecursive(path);
}
};
class CreateHardlinkOperation : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
CreateHardlinkOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.createHardLink(path_from, path_to);
}
void undo() override
{
disk.removeFile(path_to);
}
};
class MoveFileOperation : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
MoveFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.moveFile(path_from, path_to);
}
void undo() override
{
disk.moveFile(path_to, path_from);
}
};
class MoveDirectoryOperation : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
public:
MoveDirectoryOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
{}
void execute() override
{
disk.moveDirectory(path_from, path_to);
}
void undo() override
{
disk.moveDirectory(path_to, path_from);
}
};
class ReplaceFileOperation : public IMetadataOperation
{
private:
std::string path_from;
std::string path_to;
IDisk & disk;
std::string temp_path_to;
public:
ReplaceFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_)
: path_from(path_from_)
, path_to(path_to_)
, disk(disk_)
, temp_path_to(getTempFileName())
{
}
void execute() override
{
if (disk.exists(path_to))
disk.moveFile(path_to, temp_path_to);
disk.replaceFile(path_from, path_to);
}
void undo() override
{
disk.moveFile(path_to, path_from);
disk.moveFile(temp_path_to, path_to);
}
void finalize() override
{
disk.removeFileIfExists(temp_path_to);
}
};
class WriteFileOperation: public IMetadataOperation
{
private:
std::string path;
std::string temp_path;
IDisk & disk;
public:
WriteFileOperation(const std::string & path_, const std::string & temp_path_, IDisk & disk_)
: path(path_)
, temp_path(temp_path_)
, disk(disk_)
{}
void execute() override
{
disk.moveFile(temp_path, path);
}
void undo() override
{
disk.removeFileIfExists(path);
disk.removeFileIfExists(temp_path);
}
};
}
std::unique_ptr<WriteBufferFromFileBase> MetadataStorageFromDisk::writeFile( /// NOLINT
const std::string & path,
MetadataTransactionPtr transaction,
size_t buf_size,
const WriteSettings & settings)
{
std::string temp_path = getTempFileName();
transaction->addOperation(std::make_unique<WriteFileOperation>(path, temp_path, *disk));
return disk->writeFile(temp_path, buf_size, WriteMode::Rewrite, settings);
}
void MetadataStorageFromDiskTransaction::addOperation(MetadataOperationPtr && operation)
{
if (state != MetadataFromDiskTransactionState::PREPARING)
throw Exception(ErrorCodes::FS_METADATA_ERROR, "Cannot add operations to transaction in {} state, it should be in {} state",
toString(state), toString(MetadataFromDiskTransactionState::PREPARING));
operations.emplace_back(std::move(operation));
}
void MetadataStorageFromDiskTransaction::commit()
{
if (state != MetadataFromDiskTransactionState::COMMITTED)
throw Exception(ErrorCodes::FS_METADATA_ERROR, "Cannot commit transaction in {} state, it should be in {} state",
toString(state), toString(MetadataFromDiskTransactionState::COMMITTED));
for (size_t i = 0; i < operations.size(); ++i)
{
try
{
operations[i]->execute();
}
catch (Exception & ex)
{
ex.addMessage(fmt::format("While commiting operation #{}", i));
failed_operation_index = i;
state = MetadataFromDiskTransactionState::FAILED;
throw;
}
}
/// Do it in "best effort" mode
for (size_t i = 0; i < operations.size(); ++i)
{
try
{
operations[i]->finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("Failed to finalize operation #{}", i));
}
}
state = MetadataFromDiskTransactionState::COMMITTED;
}
void MetadataStorageFromDiskTransaction::rollback()
{
/// Otherwise everything is alright
if (state == MetadataFromDiskTransactionState::FAILED)
{
if (!failed_operation_index.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction in failed state, but has not failed operations. It's a bug");
for (int64_t i = failed_operation_index.value(); i >= 0; --i)
{
try
{
operations[i]->undo();
}
catch (Exception & ex)
{
state = MetadataFromDiskTransactionState::PARTIALLY_ROLLED_BACK;
ex.addMessage(fmt::format("While rolling back operation #{}", i));
throw;
}
}
}
else
{
/// Nothing to do, transaction commited or not even started to commit
}
state = MetadataFromDiskTransactionState::ROLLED_BACK;
}
MetadataStorageFromDiskTransaction::~MetadataStorageFromDiskTransaction()
{
if (state == MetadataFromDiskTransactionState::FAILED)
{
try
{
rollback();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}
const std::string & MetadataStorageFromDisk::getPath() const
{
return disk->getPath();
}
bool MetadataStorageFromDisk::exists(const std::string & path) const
{
return disk->exists(path);
}
bool MetadataStorageFromDisk::isFile(const std::string & path) const
{
return disk->isFile(path);
}
bool MetadataStorageFromDisk::isDirectory(const std::string & path) const
{
return disk->isDirectory(path);
}
Poco::Timestamp MetadataStorageFromDisk::getLastModified(const std::string & path) const
{
return disk->getLastModified(path);
}
std::vector<std::string> MetadataStorageFromDisk::listDirectory(const std::string & path) const
{
std::vector<std::string> result_files;
disk->listFiles(path, result_files);
return result_files;
}
DirectoryIteratorPtr MetadataStorageFromDisk::iterateDirectory(const std::string & path)
{
return disk->iterateDirectory(path);
}
std::unique_ptr<ReadBufferFromFileBase> MetadataStorageFromDisk::readFile( /// NOLINT
const std::string & path,
const ReadSettings & settings,
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
return disk->readFile(path, settings, read_hint, file_size);
}
void MetadataStorageFromDisk::setLastModified(const std::string & path, const Poco::Timestamp & timestamp, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<SetLastModifiedOperation>(path, timestamp, *disk));
}
void MetadataStorageFromDisk::unlinkFile(const std::string & path, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<UnlinkFileOperation>(path, *disk));
}
void MetadataStorageFromDisk::removeRecursive(const std::string & path, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<RemoveRecursiveOperation>(path, *disk));
}
void MetadataStorageFromDisk::createDirectory(const std::string & path, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<CreateDirectoryOperation>(path, *disk));
}
void MetadataStorageFromDisk::createDicrectoryRecursive(const std::string & path, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<CreateDirectoryRecursiveOperation>(path, *disk));
}
void MetadataStorageFromDisk::removeDirectory(const std::string & path, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<RemoveDirectoryOperation>(path, *disk));
}
void MetadataStorageFromDisk::createHardLink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<CreateHardlinkOperation>(path_from, path_to, *disk));
}
void MetadataStorageFromDisk::moveFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<MoveFileOperation>(path_from, path_to, *disk));
}
void MetadataStorageFromDisk::moveDirectory(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<MoveDirectoryOperation>(path_from, path_to, *disk));
}
void MetadataStorageFromDisk::replaceFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction)
{
transaction->addOperation(std::make_unique<ReplaceFileOperation>(path_from, path_to, *disk));
}
}

View File

@ -1,83 +1,71 @@
#pragma once
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <Disks/DiskLocal.h>
#include <Disks/IDisk.h>
namespace DB
{
enum class LocalMetadataTransactionState
enum class MetadataFromDiskTransactionState
{
PREPARING,
COMMITED,
ROLLED_BACK
FAILED,
COMMITTED,
ROLLED_BACK,
PARTIALLY_ROLLED_BACK,
};
struct LocalDiskMetadataTransaction : public IMetadataTransaction
std::string toString(MetadataFromDiskTransactionState state);
struct MetadataStorageFromDiskTransaction : public IMetadataTransaction
{
private:
std::vector<MetadataOperationPtr> operations;
LocalMetadataTransactionState state{LocalMetadataTransactionState::PREPARING};
public:
void addOperation(MetadataOperationPtr && operation) override
{
operations.emplace_back(std::move(operation));
}
std::optional<size_t> failed_operation_index;
std::vector<MetadataOperationPtr> operations;
MetadataFromDiskTransactionState state{MetadataFromDiskTransactionState::PREPARING};
public:
void addOperation(MetadataOperationPtr && operation) override;
void commit() override;
void rollback() override;
~MetadataStorageFromDiskTransaction() override;
};
class LocalDiskMetadataStorage : public IMetadataStorage
class MetadataStorageFromDisk : public IMetadataStorage
{
private:
DiskPtr local_disk;
DiskPtr disk;
public:
explicit LocalDiskMetadataStorage(DiskPtr local_disk_)
: local_disk(local_disk_)
explicit MetadataStorageFromDisk(DiskPtr disk_)
: disk(disk_)
{
}
MetadataTransactionPtr createTransaction() const
MetadataTransactionPtr createTransaction() const override
{
return std::make_shared<LocalDiskMetadataTransaction>(local_disk);
return std::make_shared<MetadataStorageFromDiskTransaction>();
}
bool exists(const std::string & path) const override
{
return local_disk->exists(path);
}
const std::string & getPath() const override;
bool isFile(const std::string & path) const override
{
return local_disk->isFile(path);
}
bool exists(const std::string & path) const override;
bool isDirectory(const std::string & path) const override
{
return local_disk->isDirectory(path);
}
bool isFile(const std::string & path) const override;
Poco::Timestamp getLastModified(const std::string & path) const override
{
return local_disk->getLastModified(path);
}
bool isDirectory(const std::string & path) const override;
std::vector<std::string> listDirectory(const std::string & path) const override
{
std::vector<std::string> result_files;
local_disk->listFiles(path, result_files);
return result_files;
}
Poco::Timestamp getLastModified(const std::string & path) const override;
std::vector<std::string> listDirectory(const std::string & path) const override;
DirectoryIteratorPtr iterateDirectory(const std::string & path) override;
std::unique_ptr<ReadBufferFromFileBase> readFile( /// NOLINT
const std::string & path,
const ReadSettings & settings = ReadSettings{},
std::optional<size_t> read_hint = {},
std::optional<size_t> file_size = {}) const override
{
return local_disk->readFile(path, settings, read_hint, file_size);
}
std::optional<size_t> file_size = {}) const override;
std::unique_ptr<WriteBufferFromFileBase> writeFile( /// NOLINT
const std::string & path,
@ -95,10 +83,14 @@ public:
void removeDirectory(const std::string & path, MetadataTransactionPtr transaction) override;
void createHardlink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override;
void removeRecursive(const std::string & path, MetadataTransactionPtr transaction) override;
void createHardLink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override;
void moveFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override;
void moveDirectory(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override;
void replaceFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override;
};

View File

@ -24,6 +24,7 @@
#include <Disks/ObjectStorages/S3/ProxyResolverConfiguration.h>
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
#include <Disks/ObjectStorages/S3/diskSettings.h>
#include <Disks/ObjectStorages/MetadataStorageFromDisk.h>
#include <IO/S3Common.h>
@ -85,6 +86,8 @@ void registerDiskS3(DiskFactory & factory)
auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context);
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk);
FileCachePtr cache = getCachePtrForDisk(name, config, config_prefix, context);
ObjectStoragePtr s3_storage = std::make_unique<S3ObjectStorage>(
@ -99,7 +102,7 @@ void registerDiskS3(DiskFactory & factory)
name,
uri.key,
"DiskS3",
metadata_disk,
std::move(metadata_storage),
std::move(s3_storage),
DiskType::S3,
send_metadata,

View File

@ -17,7 +17,7 @@ std::optional<std::string> MergeTreeIndexGranularityInfo::getMarksExtensionFromF
{
if (disk->exists(path_to_part))
{
for (DiskDirectoryIteratorPtr it = disk->iterateDirectory(path_to_part); it->isValid(); it->next())
for (DirectoryIteratorPtr it = disk->iterateDirectory(path_to_part); it->isValid(); it->next())
{
const auto & ext = fs::path(it->path()).extension();
if (ext == getNonAdaptiveMrkExtension()

View File

@ -10,6 +10,8 @@
#include <Common/thread_local_rng.h>
#include <Common/typeid_cast.h>
#include <Disks/ObjectStorages/IMetadataStorage.h>
#include <base/sort.h>
#include <Storages/AlterCommands.h>
@ -8154,10 +8156,12 @@ public:
void save(DiskPtr data_disk, const String & path) const
{
auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf();
auto metadata_storage = data_disk->getMetadataStorage();
auto file_path = getFileName(path);
auto buffer = metadata_disk->writeFile(file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite);
auto tx = metadata_storage->createTransaction();
auto buffer = metadata_storage->writeFile(file_path, tx, DBMS_DEFAULT_BUFFER_SIZE);
writeIntText(version, *buffer);
buffer->write("\n", 1);
writeBoolText(is_replicated, *buffer);
@ -8170,16 +8174,18 @@ public:
buffer->write("\n", 1);
writeString(table_shared_id, *buffer);
buffer->write("\n", 1);
tx->commit();
}
bool load(DiskPtr data_disk, const String & path)
{
auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf();
auto metadata_storage = data_disk->getMetadataStorage();
auto file_path = getFileName(path);
if (!metadata_disk->exists(file_path))
if (!metadata_storage->exists(file_path))
return false;
auto buffer = metadata_disk->readFile(file_path, ReadSettings(), {});
auto buffer = metadata_storage->readFile(file_path, ReadSettings(), {});
readIntText(version, *buffer);
if (version != 1)
{
@ -8202,8 +8208,14 @@ public:
static void clean(DiskPtr data_disk, const String & path)
{
auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf();
metadata_disk->removeFileIfExists(getFileName(path));
auto metadata_storage = data_disk->getMetadataStorage();
auto fname = getFileName(path);
if (metadata_storage->exists(fname))
{
auto tx = metadata_storage->createTransaction();
metadata_storage->unlinkFile(fname, tx);
tx->commit();
}
}
private: