From 46bccae078d825683aa702ee8725622107211324 Mon Sep 17 00:00:00 2001 From: alesapin Date: Thu, 2 Jun 2022 18:09:40 +0200 Subject: [PATCH] Trying to create metadata layer --- src/Common/ErrorCodes.cpp | 1 + src/Disks/DirectoryIterator.h | 31 + src/Disks/DiskDecorator.cpp | 2 +- src/Disks/DiskDecorator.h | 4 +- src/Disks/DiskEncrypted.h | 2 +- src/Disks/DiskLocal.cpp | 4 +- src/Disks/DiskLocal.h | 2 +- src/Disks/DiskMemory.cpp | 4 +- src/Disks/DiskMemory.h | 2 +- src/Disks/DiskRestartProxy.cpp | 2 +- src/Disks/DiskRestartProxy.h | 2 +- src/Disks/DiskWebServer.cpp | 4 +- src/Disks/DiskWebServer.h | 2 +- src/Disks/IDisk.cpp | 3 + src/Disks/IDisk.h | 36 +- .../registerDiskAzureBlobStorage.cpp | 5 +- .../ObjectStorages/DiskObjectStorage.cpp | 78 ++- src/Disks/ObjectStorages/DiskObjectStorage.h | 11 +- .../DiskObjectStorageMetadata.cpp | 52 +- .../DiskObjectStorageMetadata.h | 17 +- .../DiskObjectStorageMetadataHelper.cpp | 22 +- .../ObjectStorages/HDFS/registerDiskHDFS.cpp | 6 +- src/Disks/ObjectStorages/IMetadataStorage.h | 20 +- .../LocalDiskMetadataStorage.cpp | 292 --------- .../MetadataStorageFromDisk.cpp | 558 ++++++++++++++++++ ...ataStorage.h => MetadataStorageFromDisk.h} | 82 ++- .../ObjectStorages/S3/registerDiskS3.cpp | 5 +- .../MergeTreeIndexGranularityInfo.cpp | 2 +- src/Storages/StorageReplicatedMergeTree.cpp | 26 +- 29 files changed, 807 insertions(+), 470 deletions(-) create mode 100644 src/Disks/DirectoryIterator.h delete mode 100644 src/Disks/ObjectStorages/LocalDiskMetadataStorage.cpp create mode 100644 src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp rename src/Disks/ObjectStorages/{LocalDiskMetadataStorage.h => MetadataStorageFromDisk.h} (55%) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 973dde10756..a416262ae9f 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -629,6 +629,7 @@ M(658, MEILISEARCH_MISSING_SOME_COLUMNS) \ M(659, UNKNOWN_STATUS_OF_TRANSACTION) \ M(660, HDFS_ERROR) \ + M(661, FS_METADATA_ERROR) \ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ diff --git a/src/Disks/DirectoryIterator.h b/src/Disks/DirectoryIterator.h new file mode 100644 index 00000000000..6021effa77a --- /dev/null +++ b/src/Disks/DirectoryIterator.h @@ -0,0 +1,31 @@ +#pragma once + +#include +#include + +namespace DB +{ +/** + * Iterator of directory contents + */ +class IDirectoryIterator +{ +public: + /// Iterate to the next file. + virtual void next() = 0; + + /// Return `true` if the iterator points to a valid element. + virtual bool isValid() const = 0; + + /// Path to the file that the iterator currently points to. + virtual std::string path() const = 0; + + /// Name of the file that the iterator currently points to. + virtual std::string name() const = 0; + + virtual ~IDirectoryIterator() = default; +}; + +using DirectoryIteratorPtr = std::unique_ptr; + +} diff --git a/src/Disks/DiskDecorator.cpp b/src/Disks/DiskDecorator.cpp index 02babfbb59f..412897d3a0a 100644 --- a/src/Disks/DiskDecorator.cpp +++ b/src/Disks/DiskDecorator.cpp @@ -83,7 +83,7 @@ void DiskDecorator::moveDirectory(const String & from_path, const String & to_pa delegate->moveDirectory(from_path, to_path); } -DiskDirectoryIteratorPtr DiskDecorator::iterateDirectory(const String & path) +DirectoryIteratorPtr DiskDecorator::iterateDirectory(const String & path) { return delegate->iterateDirectory(path); } diff --git a/src/Disks/DiskDecorator.h b/src/Disks/DiskDecorator.h index 883e58f9c8c..f2896f11094 100644 --- a/src/Disks/DiskDecorator.h +++ b/src/Disks/DiskDecorator.h @@ -28,7 +28,7 @@ public: void createDirectories(const String & path) override; void clearDirectory(const String & path) override; void moveDirectory(const String & from_path, const String & to_path) override; - DiskDirectoryIteratorPtr iterateDirectory(const String & path) override; + DirectoryIteratorPtr iterateDirectory(const String & path) override; void createFile(const String & path) override; void moveFile(const String & from_path, const String & to_path) override; void replaceFile(const String & from_path, const String & to_path) override; @@ -77,7 +77,7 @@ public: std::vector getRemotePaths(const String & path) const override { return delegate->getRemotePaths(path); } void getRemotePathsRecursive(const String & path, std::vector & paths_map) override { return delegate->getRemotePathsRecursive(path, paths_map); } - DiskPtr getMetadataDiskIfExistsOrSelf() override { return delegate->getMetadataDiskIfExistsOrSelf(); } + MetadataStoragePtr getMetadataStorage() override { return delegate->getMetadataStorage(); } std::unordered_map getSerializedMetadata(const std::vector & file_paths) const override { return delegate->getSerializedMetadata(file_paths); } diff --git a/src/Disks/DiskEncrypted.h b/src/Disks/DiskEncrypted.h index 14793818f07..b17a9bc6974 100644 --- a/src/Disks/DiskEncrypted.h +++ b/src/Disks/DiskEncrypted.h @@ -83,7 +83,7 @@ public: delegate->moveDirectory(wrapped_from_path, wrapped_to_path); } - DiskDirectoryIteratorPtr iterateDirectory(const String & path) override + DirectoryIteratorPtr iterateDirectory(const String & path) override { auto wrapped_path = wrappedPath(path); return delegate->iterateDirectory(wrapped_path); diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index 85de683b431..a6addf3c2f5 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -173,7 +173,7 @@ private: }; -class DiskLocalDirectoryIterator final : public IDiskDirectoryIterator +class DiskLocalDirectoryIterator final : public IDirectoryIterator { public: DiskLocalDirectoryIterator() = default; @@ -325,7 +325,7 @@ void DiskLocal::moveDirectory(const String & from_path, const String & to_path) fs::rename(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path); } -DiskDirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path) +DirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path) { fs::path meta_path = fs::path(disk_path) / path; if (!broken && fs::exists(meta_path) && fs::is_directory(meta_path)) diff --git a/src/Disks/DiskLocal.h b/src/Disks/DiskLocal.h index cb3356ab7f2..0d1a2e9cdda 100644 --- a/src/Disks/DiskLocal.h +++ b/src/Disks/DiskLocal.h @@ -58,7 +58,7 @@ public: void moveDirectory(const String & from_path, const String & to_path) override; - DiskDirectoryIteratorPtr iterateDirectory(const String & path) override; + DirectoryIteratorPtr iterateDirectory(const String & path) override; void createFile(const String & path) override; diff --git a/src/Disks/DiskMemory.cpp b/src/Disks/DiskMemory.cpp index 4f0e881e079..5021b68aa65 100644 --- a/src/Disks/DiskMemory.cpp +++ b/src/Disks/DiskMemory.cpp @@ -20,7 +20,7 @@ namespace ErrorCodes } -class DiskMemoryDirectoryIterator final : public IDiskDirectoryIterator +class DiskMemoryDirectoryIterator final : public IDirectoryIterator { public: explicit DiskMemoryDirectoryIterator(std::vector && dir_file_paths_) @@ -262,7 +262,7 @@ void DiskMemory::moveDirectory(const String & /*from_path*/, const String & /*to throw Exception("Method moveDirectory is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED); } -DiskDirectoryIteratorPtr DiskMemory::iterateDirectory(const String & path) +DirectoryIteratorPtr DiskMemory::iterateDirectory(const String & path) { std::lock_guard lock(mutex); diff --git a/src/Disks/DiskMemory.h b/src/Disks/DiskMemory.h index 726be8bc3b5..4f09e63814a 100644 --- a/src/Disks/DiskMemory.h +++ b/src/Disks/DiskMemory.h @@ -52,7 +52,7 @@ public: void moveDirectory(const String & from_path, const String & to_path) override; - DiskDirectoryIteratorPtr iterateDirectory(const String & path) override; + DirectoryIteratorPtr iterateDirectory(const String & path) override; void createFile(const String & path) override; diff --git a/src/Disks/DiskRestartProxy.cpp b/src/Disks/DiskRestartProxy.cpp index bf97c2cfe49..35feb29749e 100644 --- a/src/Disks/DiskRestartProxy.cpp +++ b/src/Disks/DiskRestartProxy.cpp @@ -171,7 +171,7 @@ void DiskRestartProxy::moveDirectory(const String & from_path, const String & to DiskDecorator::moveDirectory(from_path, to_path); } -DiskDirectoryIteratorPtr DiskRestartProxy::iterateDirectory(const String & path) +DirectoryIteratorPtr DiskRestartProxy::iterateDirectory(const String & path) { ReadLock lock (mutex); return DiskDecorator::iterateDirectory(path); diff --git a/src/Disks/DiskRestartProxy.h b/src/Disks/DiskRestartProxy.h index 084e06e3f18..a6dcb068372 100644 --- a/src/Disks/DiskRestartProxy.h +++ b/src/Disks/DiskRestartProxy.h @@ -37,7 +37,7 @@ public: void createDirectories(const String & path) override; void clearDirectory(const String & path) override; void moveDirectory(const String & from_path, const String & to_path) override; - DiskDirectoryIteratorPtr iterateDirectory(const String & path) override; + DirectoryIteratorPtr iterateDirectory(const String & path) override; void createFile(const String & path) override; void moveFile(const String & from_path, const String & to_path) override; void replaceFile(const String & from_path, const String & to_path) override; diff --git a/src/Disks/DiskWebServer.cpp b/src/Disks/DiskWebServer.cpp index b8a0d12d6c1..eaeafe447aa 100644 --- a/src/Disks/DiskWebServer.cpp +++ b/src/Disks/DiskWebServer.cpp @@ -95,7 +95,7 @@ void DiskWebServer::initialize(const String & uri_path) const } -class DiskWebServerDirectoryIterator final : public IDiskDirectoryIterator +class DiskWebServerDirectoryIterator final : public IDirectoryIterator { public: explicit DiskWebServerDirectoryIterator(std::vector && dir_file_paths_) @@ -188,7 +188,7 @@ std::unique_ptr DiskWebServer::readFile(const String & p } -DiskDirectoryIteratorPtr DiskWebServer::iterateDirectory(const String & path) +DirectoryIteratorPtr DiskWebServer::iterateDirectory(const String & path) { std::vector dir_file_paths; if (files.find(path) == files.end()) diff --git a/src/Disks/DiskWebServer.h b/src/Disks/DiskWebServer.h index 47042fabc3d..4e66cb6f1ff 100644 --- a/src/Disks/DiskWebServer.h +++ b/src/Disks/DiskWebServer.h @@ -96,7 +96,7 @@ public: bool isDirectory(const String & path) const override; - DiskDirectoryIteratorPtr iterateDirectory(const String & /* path */) override; + DirectoryIteratorPtr iterateDirectory(const String & /* path */) override; Poco::Timestamp getLastModified(const String &) override { return Poco::Timestamp{}; } diff --git a/src/Disks/IDisk.cpp b/src/Disks/IDisk.cpp index 363cab3c539..f15025ce45b 100644 --- a/src/Disks/IDisk.cpp +++ b/src/Disks/IDisk.cpp @@ -6,6 +6,7 @@ #include #include #include +#include namespace DB { @@ -99,4 +100,6 @@ SyncGuardPtr IDisk::getDirectorySyncGuard(const String & /* path */) const return nullptr; } +MetadataStoragePtr IDisk::getMetadataStorage() { return std::make_shared(std::static_pointer_cast(shared_from_this())); } + } diff --git a/src/Disks/IDisk.h b/src/Disks/IDisk.h index f243f477f2c..31cbdfac23b 100644 --- a/src/Disks/IDisk.h +++ b/src/Disks/IDisk.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include @@ -39,9 +40,6 @@ namespace ErrorCodes extern const int NOT_IMPLEMENTED; } -class IDiskDirectoryIterator; -using DiskDirectoryIteratorPtr = std::unique_ptr; - class IReservation; using ReservationPtr = std::unique_ptr; using Reservations = std::vector; @@ -49,6 +47,8 @@ using Reservations = std::vector; class ReadBufferFromFileBase; class WriteBufferFromFileBase; class MMappedFileCache; +class IMetadataStorage; +using MetadataStoragePtr = std::shared_ptr; /** @@ -92,7 +92,10 @@ class IDisk : public Space { public: /// Default constructor. - explicit IDisk(std::unique_ptr executor_ = std::make_unique()) : executor(std::move(executor_)) { } + explicit IDisk(std::unique_ptr executor_ = std::make_unique()) + : executor(std::move(executor_)) + { + } /// Root path for all files stored on the disk. /// It's not required to be a local filesystem path. @@ -135,7 +138,7 @@ public: virtual void moveDirectory(const String & from_path, const String & to_path) = 0; /// Return iterator to the contents of the specified directory. - virtual DiskDirectoryIteratorPtr iterateDirectory(const String & path) = 0; + virtual DirectoryIteratorPtr iterateDirectory(const String & path) = 0; /// Return `true` if the specified directory is empty. bool isDirectoryEmpty(const String & path); @@ -317,7 +320,7 @@ public: /// Actually it's a part of IDiskRemote implementation but we have so /// complex hierarchy of disks (with decorators), so we cannot even /// dynamic_cast some pointer to IDisk to pointer to IDiskRemote. - virtual std::shared_ptr getMetadataDiskIfExistsOrSelf() { return std::static_pointer_cast(shared_from_this()); } + virtual MetadataStoragePtr getMetadataStorage(); /// Very similar case as for getMetadataDiskIfExistsOrSelf(). If disk has "metadata" /// it will return mapping for each required path: path -> metadata as string. @@ -364,27 +367,6 @@ private: using DiskPtr = std::shared_ptr; using Disks = std::vector; -/** - * Iterator of directory contents on particular disk. - */ -class IDiskDirectoryIterator -{ -public: - /// Iterate to the next file. - virtual void next() = 0; - - /// Return `true` if the iterator points to a valid element. - virtual bool isValid() const = 0; - - /// Path to the file that the iterator currently points to. - virtual String path() const = 0; - - /// Name of the file that the iterator currently points to. - virtual String name() const = 0; - - virtual ~IDiskDirectoryIterator() = default; -}; - /** * Information about reserved size on particular disk. */ diff --git a/src/Disks/ObjectStorages/AzureBlobStorage/registerDiskAzureBlobStorage.cpp b/src/Disks/ObjectStorages/AzureBlobStorage/registerDiskAzureBlobStorage.cpp index 92ba6e426b3..b71c77dded8 100644 --- a/src/Disks/ObjectStorages/AzureBlobStorage/registerDiskAzureBlobStorage.cpp +++ b/src/Disks/ObjectStorages/AzureBlobStorage/registerDiskAzureBlobStorage.cpp @@ -11,6 +11,7 @@ #include #include +#include namespace DB { @@ -82,11 +83,13 @@ void registerDiskAzureBlobStorage(DiskFactory & factory) uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16); bool send_metadata = config.getBool(config_prefix + ".send_metadata", false); + auto metadata_storage = std::make_shared(metadata_disk); + std::shared_ptr azure_blob_storage_disk = std::make_shared( name, /* no namespaces */"", "DiskAzureBlobStorage", - metadata_disk, + std::move(metadata_storage), std::move(azure_object_storage), DiskType::AzureBlobStorage, send_metadata, diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.cpp b/src/Disks/ObjectStorages/DiskObjectStorage.cpp index 208e05fa1bc..8f62b2da8e4 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorage.cpp @@ -89,7 +89,7 @@ DiskObjectStorage::DiskObjectStorage( const String & name_, const String & remote_fs_root_path_, const String & log_name, - DiskPtr metadata_disk_, + MetadataStoragePtr && metadata_storage_, ObjectStoragePtr && object_storage_, DiskType disk_type_, bool send_metadata_, @@ -98,8 +98,8 @@ DiskObjectStorage::DiskObjectStorage( , name(name_) , remote_fs_root_path(remote_fs_root_path_) , log (&Poco::Logger::get(log_name)) - , metadata_disk(metadata_disk_) , disk_type(disk_type_) + , metadata_storage(std::move(metadata_storage_)) , object_storage(std::move(object_storage_)) , send_metadata(send_metadata_) , metadata_helper(std::make_unique(this, ReadSettings{})) @@ -107,7 +107,7 @@ DiskObjectStorage::DiskObjectStorage( DiskObjectStorage::Metadata DiskObjectStorage::readMetadataUnlocked(const String & path, std::shared_lock &) const { - return Metadata::readMetadata(remote_fs_root_path, metadata_disk, path); + return Metadata::readMetadata(remote_fs_root_path, metadata_storage, path); } @@ -120,37 +120,37 @@ DiskObjectStorage::Metadata DiskObjectStorage::readMetadata(const String & path) DiskObjectStorage::Metadata DiskObjectStorage::readUpdateAndStoreMetadata(const String & path, bool sync, DiskObjectStorage::MetadataUpdater updater) { std::unique_lock lock(metadata_mutex); - return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater); + return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_storage, path, sync, updater); } void DiskObjectStorage::readUpdateStoreMetadataAndRemove(const String & path, bool sync, DiskObjectStorage::MetadataUpdater updater) { std::unique_lock lock(metadata_mutex); - Metadata::readUpdateStoreMetadataAndRemove(remote_fs_root_path, metadata_disk, path, sync, updater); + Metadata::readUpdateStoreMetadataAndRemove(remote_fs_root_path, metadata_storage, path, sync, updater); } DiskObjectStorage::Metadata DiskObjectStorage::readOrCreateUpdateAndStoreMetadata(const String & path, WriteMode mode, bool sync, DiskObjectStorage::MetadataUpdater updater) { - if (mode == WriteMode::Rewrite || !metadata_disk->exists(path)) + if (mode == WriteMode::Rewrite || !metadata_storage->exists(path)) { std::unique_lock lock(metadata_mutex); - return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater); + return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_storage, path, sync, updater); } else { - return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater); + return Metadata::readUpdateAndStoreMetadata(remote_fs_root_path, metadata_storage, path, sync, updater); } } DiskObjectStorage::Metadata DiskObjectStorage::createAndStoreMetadata(const String & path, bool sync) { - return Metadata::createAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync); + return Metadata::createAndStoreMetadata(remote_fs_root_path, metadata_storage, path, sync); } DiskObjectStorage::Metadata DiskObjectStorage::createUpdateAndStoreMetadata(const String & path, bool sync, DiskObjectStorage::MetadataUpdater updater) { - return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_disk, path, sync, updater); + return Metadata::createUpdateAndStoreMetadata(remote_fs_root_path, metadata_storage, path, sync, updater); } std::vector DiskObjectStorage::getRemotePaths(const String & local_path) const @@ -168,7 +168,7 @@ std::vector DiskObjectStorage::getRemotePaths(const String & local_path) void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::vector & paths_map) { /// Protect against concurrent delition of files (for example because of a merge). - if (metadata_disk->isFile(local_path)) + if (metadata_storage->isFile(local_path)) { try { @@ -188,7 +188,7 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std:: } else { - DiskDirectoryIteratorPtr it; + DirectoryIteratorPtr it; try { it = iterateDirectory(local_path); @@ -216,13 +216,13 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std:: bool DiskObjectStorage::exists(const String & path) const { - return metadata_disk->exists(path); + return metadata_storage->exists(path); } bool DiskObjectStorage::isFile(const String & path) const { - return metadata_disk->isFile(path); + return metadata_storage->isFile(path); } @@ -255,7 +255,9 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat { std::unique_lock lock(metadata_mutex); - metadata_disk->moveFile(from_path, to_path); + auto tx = metadata_storage->createTransaction(); + metadata_storage->moveFile(from_path, to_path, tx); + tx->commit(); } } @@ -351,7 +353,9 @@ void DiskObjectStorage::createHardLink(const String & src_path, const String & d } /// Create FS hardlink to metadata file. - metadata_disk->createHardLink(src_path, dst_path); + auto tx = metadata_storage->createTransaction(); + metadata_storage->createHardLink(src_path, dst_path, tx); + tx->commit(); } void DiskObjectStorage::createHardLink(const String & src_path, const String & dst_path) @@ -370,19 +374,23 @@ void DiskObjectStorage::setReadOnly(const String & path) bool DiskObjectStorage::isDirectory(const String & path) const { - return metadata_disk->isDirectory(path); + return metadata_storage->isDirectory(path); } void DiskObjectStorage::createDirectory(const String & path) { - metadata_disk->createDirectory(path); + auto tx = metadata_storage->createTransaction(); + metadata_storage->createDirectory(path, tx); + tx->commit(); } void DiskObjectStorage::createDirectories(const String & path) { - metadata_disk->createDirectories(path); + auto tx = metadata_storage->createTransaction(); + metadata_storage->createDicrectoryRecursive(path, tx); + tx->commit(); } @@ -396,13 +404,15 @@ void DiskObjectStorage::clearDirectory(const String & path) void DiskObjectStorage::removeDirectory(const String & path) { - metadata_disk->removeDirectory(path); + auto tx = metadata_storage->createTransaction(); + metadata_storage->removeDirectory(path, tx); + tx->commit(); } -DiskDirectoryIteratorPtr DiskObjectStorage::iterateDirectory(const String & path) +DirectoryIteratorPtr DiskObjectStorage::iterateDirectory(const String & path) { - return metadata_disk->iterateDirectory(path); + return metadata_storage->iterateDirectory(path); } @@ -415,23 +425,25 @@ void DiskObjectStorage::listFiles(const String & path, std::vector & fil void DiskObjectStorage::setLastModified(const String & path, const Poco::Timestamp & timestamp) { - metadata_disk->setLastModified(path, timestamp); + auto tx = metadata_storage->createTransaction(); + metadata_storage->setLastModified(path, timestamp, tx); + tx->commit(); } Poco::Timestamp DiskObjectStorage::getLastModified(const String & path) { - return metadata_disk->getLastModified(path); + return metadata_storage->getLastModified(path); } void DiskObjectStorage::removeMetadata(const String & path, std::vector & paths_to_remove) { - LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_disk->getPath() + path)); + LOG_TRACE(log, "Remove file by path: {}", backQuote(metadata_storage->getPath() + path)); - if (!metadata_disk->exists(path)) + if (!metadata_storage->exists(path)) throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "Metadata path '{}' doesn't exist", path); - if (!metadata_disk->isFile(path)) + if (!metadata_storage->isFile(path)) throw Exception(ErrorCodes::BAD_FILE_TYPE, "Path '{}' is not a regular file", path); try @@ -470,7 +482,9 @@ void DiskObjectStorage::removeMetadata(const String & path, std::vector backQuote(path), e.nested() ? e.nested()->message() : e.message()); std::unique_lock lock(metadata_mutex); - metadata_disk->removeFile(path); + auto tx = metadata_storage->createTransaction(); + metadata_storage->unlinkFile(path, tx); + tx->commit(); } else throw; @@ -482,7 +496,7 @@ void DiskObjectStorage::removeMetadataRecursive(const String & path, std::unorde { checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks. - if (metadata_disk->isFile(path)) + if (metadata_storage->isFile(path)) { removeMetadata(path, paths_to_remove[path]); } @@ -491,7 +505,9 @@ void DiskObjectStorage::removeMetadataRecursive(const String & path, std::unorde for (auto it = iterateDirectory(path); it->isValid(); it->next()) removeMetadataRecursive(it->path(), paths_to_remove); - metadata_disk->removeDirectory(path); + auto tx = metadata_storage->createTransaction(); + metadata_storage->removeDirectory(path, tx); + tx->commit(); } } @@ -525,7 +541,7 @@ ReservationPtr DiskObjectStorage::reserve(UInt64 bytes) void DiskObjectStorage::removeSharedFileIfExists(const String & path, bool delete_metadata_only) { std::vector paths_to_remove; - if (metadata_disk->exists(path)) + if (metadata_storage->exists(path)) { removeMetadata(path, paths_to_remove); if (!delete_metadata_only) diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.h b/src/Disks/ObjectStorages/DiskObjectStorage.h index e7e8869bff0..5be23217e9c 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.h +++ b/src/Disks/ObjectStorages/DiskObjectStorage.h @@ -4,6 +4,7 @@ #include #include #include +#include #include namespace CurrentMetrics @@ -30,7 +31,7 @@ public: const String & name_, const String & remote_fs_root_path_, const String & log_name, - DiskPtr metadata_disk_, + MetadataStoragePtr && mestata_storage_, ObjectStoragePtr && object_storage_, DiskType disk_type_, bool send_metadata_, @@ -47,7 +48,7 @@ public: const String & getName() const override { return name; } - const String & getPath() const override { return metadata_disk->getPath(); } + const String & getPath() const override { return metadata_storage->getPath(); } std::vector getRemotePaths(const String & local_path) const override; @@ -108,7 +109,7 @@ public: void removeFromRemoteFS(const std::vector & paths); - DiskPtr getMetadataDiskIfExistsOrSelf() override { return metadata_disk; } + MetadataStoragePtr getMetadataStorage() override { return metadata_storage; } UInt32 getRefCount(const String & path) const override; @@ -141,7 +142,7 @@ public: void removeDirectory(const String & path) override; - DiskDirectoryIteratorPtr iterateDirectory(const String & path) override; + DirectoryIteratorPtr iterateDirectory(const String & path) override; void setLastModified(const String & path, const Poco::Timestamp & timestamp) override; @@ -180,9 +181,9 @@ private: const String name; const String remote_fs_root_path; Poco::Logger * log; - DiskPtr metadata_disk; const DiskType disk_type; + MetadataStoragePtr metadata_storage; ObjectStoragePtr object_storage; UInt64 reserved_bytes = 0; diff --git a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp index e863d811101..6cf2448a20d 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.cpp @@ -16,51 +16,53 @@ namespace ErrorCodes extern const int CANNOT_OPEN_FILE; } -DiskObjectStorageMetadata DiskObjectStorageMetadata::readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_) +DiskObjectStorageMetadata DiskObjectStorageMetadata::readMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_) { - DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_); + DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_storage_, metadata_file_path_); result.load(); return result; } -DiskObjectStorageMetadata DiskObjectStorageMetadata::createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync) +DiskObjectStorageMetadata DiskObjectStorageMetadata::createAndStoreMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync) { - DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_); + DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_storage_, metadata_file_path_); result.save(sync); return result; } -DiskObjectStorageMetadata DiskObjectStorageMetadata::readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater) +DiskObjectStorageMetadata DiskObjectStorageMetadata::readUpdateAndStoreMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater) { - DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_); + DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_storage_, metadata_file_path_); result.load(); if (updater(result)) result.save(sync); return result; } -DiskObjectStorageMetadata DiskObjectStorageMetadata::createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater) +DiskObjectStorageMetadata DiskObjectStorageMetadata::createUpdateAndStoreMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater) { - DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_); + DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_storage_, metadata_file_path_); updater(result); result.save(sync); return result; } -void DiskObjectStorageMetadata::readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater) +void DiskObjectStorageMetadata::readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater) { /// Very often we are deleting metadata from some unfinished operation (like fetch of metadata) /// in this case metadata file can be incomplete/empty and so on. It's ok to remove it in this case /// because we cannot do anything better. try { - DiskObjectStorageMetadata metadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_); + DiskObjectStorageMetadata metadata(remote_fs_root_path_, metadata_storage_, metadata_file_path_); metadata.load(); if (updater(metadata)) metadata.save(sync); - metadata_disk_->removeFile(metadata_file_path_); + auto tx = metadata_storage_->createTransaction(); + metadata_storage_->unlinkFile(metadata_file_path_, tx); + tx->commit(); } catch (Exception & ex) { @@ -71,7 +73,10 @@ void DiskObjectStorageMetadata::readUpdateStoreMetadataAndRemove(const String & { LOG_INFO(&Poco::Logger::get("ObjectStorageMetadata"), "Failed to read metadata file {} before removal because it's incomplete or empty. " "It's Ok and can happen after operation interruption (like metadata fetch), so removing as is", metadata_file_path_); - metadata_disk_->removeFile(metadata_file_path_); + + auto tx = metadata_storage_->createTransaction(); + metadata_storage_->unlinkFile(metadata_file_path_, tx); + tx->commit(); } /// If file already removed, than nothing to do @@ -82,15 +87,15 @@ void DiskObjectStorageMetadata::readUpdateStoreMetadataAndRemove(const String & } } -DiskObjectStorageMetadata DiskObjectStorageMetadata::createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite) +DiskObjectStorageMetadata DiskObjectStorageMetadata::createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, bool overwrite) { - if (overwrite || !metadata_disk_->exists(metadata_file_path_)) + if (overwrite || !metadata_storage_->exists(metadata_file_path_)) { - return createAndStoreMetadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_, sync); + return createAndStoreMetadata(remote_fs_root_path_, metadata_storage_, metadata_file_path_, sync); } else { - auto result = readMetadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_); + auto result = readMetadata(remote_fs_root_path_, metadata_storage_, metadata_file_path_); if (result.read_only) throw Exception("File is read-only: " + metadata_file_path_, ErrorCodes::PATH_ACCESS_DENIED); return result; @@ -99,8 +104,7 @@ DiskObjectStorageMetadata DiskObjectStorageMetadata::createAndStoreMetadataIfNot void DiskObjectStorageMetadata::load() { - const ReadSettings read_settings; - auto buf = metadata_disk->readFile(metadata_file_path, read_settings, 1024); /* reasonable buffer size for small file */ + auto buf = metadata_storage->readFile(metadata_file_path, ReadSettings{}, 1024); /* reasonable buffer size for small file */ UInt32 version; readIntText(version, *buf); @@ -109,7 +113,7 @@ void DiskObjectStorageMetadata::load() throw Exception( ErrorCodes::UNKNOWN_FORMAT, "Unknown metadata file version. Path: {}. Version: {}. Maximum expected version: {}", - metadata_disk->getPath() + metadata_file_path, toString(version), toString(VERSION_READ_ONLY_FLAG)); + metadata_storage->getPath() + metadata_file_path, toString(version), toString(VERSION_READ_ONLY_FLAG)); assertChar('\n', *buf); @@ -132,7 +136,7 @@ void DiskObjectStorageMetadata::load() if (!remote_fs_object_path.starts_with(remote_fs_root_path)) throw Exception(ErrorCodes::UNKNOWN_FORMAT, "Path in metadata does not correspond to root path. Path: {}, root path: {}, disk path: {}", - remote_fs_object_path, remote_fs_root_path, metadata_disk->getPath()); + remote_fs_object_path, remote_fs_root_path, metadata_storage->getPath()); remote_fs_object_path = remote_fs_object_path.substr(remote_fs_root_path.size()); } @@ -154,11 +158,11 @@ void DiskObjectStorageMetadata::load() /// Load metadata by path or create empty if `create` flag is set. DiskObjectStorageMetadata::DiskObjectStorageMetadata( const String & remote_fs_root_path_, - DiskPtr metadata_disk_, + MetadataStoragePtr metadata_storage_, const String & metadata_file_path_) : remote_fs_root_path(remote_fs_root_path_) , metadata_file_path(metadata_file_path_) - , metadata_disk(metadata_disk_) + , metadata_storage(metadata_storage_) , total_size(0), ref_count(0) { } @@ -203,8 +207,10 @@ void DiskObjectStorageMetadata::saveToBuffer(WriteBuffer & buf, bool sync) /// Fsync metadata file if 'sync' flag is set. void DiskObjectStorageMetadata::save(bool sync) { - auto buf = metadata_disk->writeFile(metadata_file_path, 1024); + auto tx = metadata_storage->createTransaction(); + auto buf = metadata_storage->writeFile(metadata_file_path, tx, 1024); saveToBuffer(*buf, sync); + tx->commit(); } std::string DiskObjectStorageMetadata::serializeToString() diff --git a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h index d6e791bd53f..79fb1dfc241 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h +++ b/src/Disks/ObjectStorages/DiskObjectStorageMetadata.h @@ -1,6 +1,7 @@ #pragma once #include +#include #include namespace DB @@ -24,7 +25,7 @@ struct DiskObjectStorageMetadata /// Relative path to metadata file on local FS. const String metadata_file_path; - DiskPtr metadata_disk; + MetadataStoragePtr metadata_storage; /// Total size of all remote FS (S3, HDFS) objects. size_t total_size = 0; @@ -40,18 +41,18 @@ struct DiskObjectStorageMetadata DiskObjectStorageMetadata( const String & remote_fs_root_path_, - DiskPtr metadata_disk_, + MetadataStoragePtr metadata_storage_, const String & metadata_file_path_); void addObject(const String & path, size_t size); - static DiskObjectStorageMetadata readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_); - static DiskObjectStorageMetadata readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater); - static void readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater); + static DiskObjectStorageMetadata readMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_); + static DiskObjectStorageMetadata readUpdateAndStoreMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, Updater updater); + static void readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, Updater updater); - static DiskObjectStorageMetadata createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync); - static DiskObjectStorageMetadata createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater); - static DiskObjectStorageMetadata createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite); + static DiskObjectStorageMetadata createAndStoreMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync); + static DiskObjectStorageMetadata createUpdateAndStoreMetadata(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, Updater updater); + static DiskObjectStorageMetadata createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, MetadataStoragePtr metadata_storage_, const String & metadata_file_path_, bool sync, bool overwrite); /// Serialize metadata to string (very same with saveToBuffer) std::string serializeToString(); diff --git a/src/Disks/ObjectStorages/DiskObjectStorageMetadataHelper.cpp b/src/Disks/ObjectStorages/DiskObjectStorageMetadataHelper.cpp index a36c987db7d..931247ef069 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageMetadataHelper.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorageMetadataHelper.cpp @@ -81,7 +81,7 @@ void DiskObjectStorageMetadataHelper::updateObjectMetadata(const String & key, c void DiskObjectStorageMetadataHelper::migrateFileToRestorableSchema(const String & path) const { - LOG_TRACE(disk->log, "Migrate file {} to restorable schema", disk->metadata_disk->getPath() + path); + LOG_TRACE(disk->log, "Migrate file {} to restorable schema", disk->metadata_storage->getPath() + path); auto meta = disk->readMetadata(path); @@ -97,7 +97,7 @@ void DiskObjectStorageMetadataHelper::migrateToRestorableSchemaRecursive(const S { checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks. - LOG_TRACE(disk->log, "Migrate directory {} to restorable schema", disk->metadata_disk->getPath() + path); + LOG_TRACE(disk->log, "Migrate directory {} to restorable schema", disk->metadata_storage->getPath() + path); bool dir_contains_only_files = true; for (auto it = disk->iterateDirectory(path); it->isValid(); it->next()) @@ -223,7 +223,9 @@ void DiskObjectStorageMetadataHelper::restore(const Poco::Util::AbstractConfigur restoreFiles(source_object_storage, information); restoreFileOperations(source_object_storage, information); - disk->metadata_disk->removeFile(RESTORE_FILE_NAME); + auto tx = disk->metadata_storage->createTransaction(); + disk->metadata_storage->unlinkFile(RESTORE_FILE_NAME, tx); + tx->commit(); saveSchemaVersion(RESTORABLE_SCHEMA_VERSION); @@ -239,7 +241,7 @@ void DiskObjectStorageMetadataHelper::restore(const Poco::Util::AbstractConfigur void DiskObjectStorageMetadataHelper::readRestoreInformation(RestoreInformation & restore_information) /// NOLINT { - auto buffer = disk->metadata_disk->readFile(RESTORE_FILE_NAME, ReadSettings{}, 512); + auto buffer = disk->metadata_storage->readFile(RESTORE_FILE_NAME, ReadSettings{}, 512); buffer->next(); try @@ -438,9 +440,11 @@ void DiskObjectStorageMetadataHelper::processRestoreFiles(IObjectStorage * sourc void DiskObjectStorage::onFreeze(const String & path) { createDirectories(path); - auto revision_file_buf = metadata_disk->writeFile(path + "revision.txt", 32); + auto tx = metadata_storage->createTransaction(); + auto revision_file_buf = metadata_storage->writeFile(path + "revision.txt", tx, 32); writeIntText(metadata_helper->revision_counter.load(), *revision_file_buf); revision_file_buf->finalize(); + tx->commit(); } static String pathToDetached(const String & source_path) @@ -531,6 +535,7 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * sou { Strings not_finished_prefixes{"tmp_", "delete_tmp_", "attaching_", "deleting_"}; + auto tx = disk->metadata_storage->createTransaction(); for (const auto & path : renames) { /// Skip already detached parts. @@ -557,12 +562,13 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * sou to_path /= from_path.filename(); /// to_path may exist and non-empty in case for example abrupt restart, so remove it before rename - if (disk->metadata_disk->exists(to_path)) - disk->metadata_disk->removeRecursive(to_path); + if (disk->metadata_storage->exists(to_path)) + disk->metadata_storage->removeRecursive(to_path, tx); disk->createDirectories(directoryPath(to_path)); - disk->metadata_disk->moveDirectory(from_path, to_path); + disk->metadata_storage->moveDirectory(from_path, to_path, tx); } + tx->commit(); } LOG_INFO(disk->log, "File operations restored for disk {}", disk->name); diff --git a/src/Disks/ObjectStorages/HDFS/registerDiskHDFS.cpp b/src/Disks/ObjectStorages/HDFS/registerDiskHDFS.cpp index 04862e43c65..96da3978dfc 100644 --- a/src/Disks/ObjectStorages/HDFS/registerDiskHDFS.cpp +++ b/src/Disks/ObjectStorages/HDFS/registerDiskHDFS.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include @@ -31,17 +32,20 @@ void registerDiskHDFS(DiskFactory & factory) config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000), context_->getSettingsRef().hdfs_replication ); + + /// FIXME Cache currently unsupported :( ObjectStoragePtr hdfs_storage = std::make_unique(nullptr, uri, std::move(settings), config); auto metadata_disk = prepareForLocalMetadata(name, config, config_prefix, context_).second; + auto metadata_storage = std::make_shared(metadata_disk); uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16); return std::make_shared( name, uri, "DiskHDFS", - metadata_disk, + std::move(metadata_storage), std::move(hdfs_storage), DiskType::HDFS, /* send_metadata = */ false, diff --git a/src/Disks/ObjectStorages/IMetadataStorage.h b/src/Disks/ObjectStorages/IMetadataStorage.h index 83c704ad79e..0fe841c9b47 100644 --- a/src/Disks/ObjectStorages/IMetadataStorage.h +++ b/src/Disks/ObjectStorages/IMetadataStorage.h @@ -1,5 +1,5 @@ #pragma once - = + #include #include #include @@ -8,6 +8,7 @@ #include #include #include +#include #include namespace DB @@ -17,6 +18,7 @@ struct IMetadataOperation { virtual void execute() = 0; virtual void undo() = 0; + virtual void finalize() {} virtual ~IMetadataOperation() = default; }; @@ -38,13 +40,17 @@ class IMetadataStorage : private boost::noncopyable { public: - MetadataTransactionPtr createTransaction() const; + virtual MetadataTransactionPtr createTransaction() const = 0; + virtual const std::string & getPath() const = 0; virtual bool exists(const std::string & path) const = 0; virtual bool isFile(const std::string & path) const = 0; virtual bool isDirectory(const std::string & path) const = 0; virtual Poco::Timestamp getLastModified(const std::string & path) const = 0; + virtual std::vector listDirectory(const std::string & path) const = 0; + virtual DirectoryIteratorPtr iterateDirectory(const String & path) = 0; + virtual std::unique_ptr readFile( /// NOLINT const std::string & path, @@ -68,15 +74,19 @@ public: virtual void removeDirectory(const std::string & path, MetadataTransactionPtr transaction) = 0; - virtual void createHardlink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0; + virtual void removeRecursive(const std::string & path, MetadataTransactionPtr transaction) = 0; + + virtual void createHardLink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0; virtual void moveFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0; + virtual void moveDirectory(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0; + virtual void replaceFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) = 0; virtual ~IMetadataStorage() = default; - - }; +using MetadataStoragePtr = std::shared_ptr; + } diff --git a/src/Disks/ObjectStorages/LocalDiskMetadataStorage.cpp b/src/Disks/ObjectStorages/LocalDiskMetadataStorage.cpp deleted file mode 100644 index f2ae1a46b63..00000000000 --- a/src/Disks/ObjectStorages/LocalDiskMetadataStorage.cpp +++ /dev/null @@ -1,292 +0,0 @@ -#include -#include -#include - -namespace DB -{ - -namespace -{ - -class SetLastModifiedOperation final : public IMetadataOperation -{ - std::string path; - Poco::Timestamp new_timestamp; - Poco::Timestamp old_timestamp; - IDisk & disk; -public: - SetLastModifiedOperation(const std::string & path_, Poco::Timestamp new_timestamp_, IDisk & disk_) - : path(path_) - , new_timestamp(new_timestamp_) - , disk(disk_) - {} - - void execute() override - { - old_timestamp = disk.getLastModified(path); - disk.setLastModified(path, new_timestamp); - } - - void undo() override - { - disk.setLastModified(path, old_timestamp); - } -}; - -class UnlinkFileOperation final : public IMetadataOperation -{ - std::string path; - IDisk & disk; -public: - UnlinkFileOperation(const std::string & path_, IDisk & disk_) - : path(path_) - , disk(disk_) - { - } - - void execute() override - { - disk.removeFile(path); - } - - - void undo() override - { - /// TODO Do something with it - } -}; - -class CreateDirectoryOperation final : public IMetadataOperation -{ -private: - std::string path; - IDisk & disk; -public: - - CreateDirectoryOperation(const std::string & path_, IDisk & disk_) - : path(path_) - , disk(disk_) - { - } - - void execute() override - { - disk.createDirectory(path); - } - - void undo() override - { - disk.removeDirectory(path); - } -}; - -class CreateDirectoryRecursiveOperation : public IMetadataOperation -{ -private: - std::string path; - std::vector paths_created; - IDisk & disk; -public: - - CreateDirectoryRecursiveOperation(const std::string & path_, IDisk & disk_) - : path(path_) - , disk(disk_) - { - } - - void execute() override - { - namespace fs = std::filesystem; - fs::path p(path); - while (!disk.exists(p)) - { - disk.createDirectory(p); - paths_created.push_back(p); - if (!p.has_parent_path()) - break; - p = p.parent_path(); - } - } - - void undo() override - { - for (const auto & path_created : paths_created) - disk.removeDirectory(path_created); - } -}; - -class RemoveDirectoryOperation : public IMetadataOperation -{ -private: - std::string path; - IDisk & disk; -public: - RemoveDirectoryOperation(const std::string & path_, IDisk & disk_) - : path(path_) - , disk(disk_) - {} - - void execute() override - { - disk.removeDirectory(path); - } - - void undo() override - { - disk.createDirectory(path); - } -}; - -class CreateHardlinkOperation : public IMetadataOperation -{ -private: - std::string path_from; - std::string path_to; - IDisk & disk; -public: - CreateHardlinkOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_) - : path_from(path_from_) - , path_to(path_to_) - , disk(disk_) - {} - - void execute() override - { - disk.createHardLink(path_from, path_to); - } - - void undo() override - { - disk.removeFile(path_to); - } -}; - -class MoveFileOperation : public IMetadataOperation -{ -private: - std::string path_from; - std::string path_to; - IDisk & disk; -public: - MoveFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_) - : path_from(path_from_) - , path_to(path_to_) - , disk(disk_) - {} - - void execute() override - { - disk.moveFile(path_from, path_to); - } - - void undo() override - { - disk.moveFile(path_to, path_from); - } -}; - -class ReplaceFileOperation : public IMetadataOperation -{ -private: - std::string path_from; - std::string path_to; - IDisk & disk; -public: - ReplaceFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_) - : path_from(path_from_) - , path_to(path_to_) - , disk(disk_) - {} - - void execute() override - { - disk.replaceFile(path_from, path_to); - } - - void undo() override - { - /// TODO something with this - } -}; - -class WriteFileOperation: public IMetadataOperation -{ -private: - std::string path; - std::string temp_path; - IDisk & disk; -public: - WriteFileOperation(const std::string & path_, const std::string & temp_path_, IDisk & disk_) - : path(path_) - , temp_path(temp_path_) - , disk(disk_) - {} - - void execute() override - { - disk.moveFile(temp_path, path); - } - - void undo() override - { - disk.removeFileIfExists(path); - disk.removeFileIfExists(temp_path); - } -}; - -} - -std::unique_ptr LocalDiskMetadataStorage::writeFile( /// NOLINT - const std::string & path, - MetadataTransactionPtr transaction, - size_t buf_size, - const WriteSettings & settings) -{ - std::string temp_path = path + "_tmp"; - transaction->addOperation(std::make_unique(path, temp_path, *local_disk)); - return local_disk->writeFile(temp_path, buf_size, WriteMode::Rewrite, settings); -} - - -void LocalDiskMetadataStorage::setLastModified(const std::string & path, const Poco::Timestamp & timestamp, MetadataTransactionPtr transaction) -{ - transaction->addOperation(std::make_unique(path, timestamp, *local_disk)); -} - -void LocalDiskMetadataStorage::unlinkFile(const std::string & path, MetadataTransactionPtr transaction) -{ - transaction->addOperation(std::make_unique(path, *local_disk)); -} - -void LocalDiskMetadataStorage::createDirectory(const std::string & path, MetadataTransactionPtr transaction) -{ - transaction->addOperation(std::make_unique(path, *local_disk)); -} - -void LocalDiskMetadataStorage::createDicrectoryRecursive(const std::string & path, MetadataTransactionPtr transaction) -{ - transaction->addOperation(std::make_unique(path, *local_disk)); -} - -void LocalDiskMetadataStorage::removeDirectory(const std::string & path, MetadataTransactionPtr transaction) -{ - transaction->addOperation(std::make_unique(path, *local_disk)); -} - -void LocalDiskMetadataStorage::createHardlink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) -{ - transaction->addOperation(std::make_unique(path_from, path_to, *local_disk)); -} - -void LocalDiskMetadataStorage::moveFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) -{ - transaction->addOperation(std::make_unique(path_from, path_to, *local_disk)); -} - -void LocalDiskMetadataStorage::replaceFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) -{ - transaction->addOperation(std::make_unique(path_from, path_to, *local_disk)); -} - - -} diff --git a/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp b/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp new file mode 100644 index 00000000000..e76541227cf --- /dev/null +++ b/src/Disks/ObjectStorages/MetadataStorageFromDisk.cpp @@ -0,0 +1,558 @@ +#include +#include +#include +#include +#include + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int FS_METADATA_ERROR; +} + + +std::string toString(MetadataFromDiskTransactionState state) +{ + switch (state) + { + case MetadataFromDiskTransactionState::PREPARING: + return "PREPARING"; + case MetadataFromDiskTransactionState::FAILED: + return "FAILED"; + case MetadataFromDiskTransactionState::COMMITTED: + return "COMMITTED"; + case MetadataFromDiskTransactionState::ROLLED_BACK: + return "ROLLED_BACK"; + case MetadataFromDiskTransactionState::PARTIALLY_ROLLED_BACK: + return "PARTIALLY_ROLLED_BACK"; + } + __builtin_unreachable(); +} + +namespace +{ + +std::string getTempFileName() +{ + std::string temp_filepath; + std::string dummy_prefix = "a/"; + temp_filepath = Poco::TemporaryFile::tempName(dummy_prefix); + dummy_prefix += "tmp"; + assert(temp_filepath.starts_with(dummy_prefix)); + temp_filepath.replace(0, dummy_prefix.length(), "tmp"); + return temp_filepath; +} + +class SetLastModifiedOperation final : public IMetadataOperation +{ + std::string path; + Poco::Timestamp new_timestamp; + Poco::Timestamp old_timestamp; + IDisk & disk; +public: + SetLastModifiedOperation(const std::string & path_, Poco::Timestamp new_timestamp_, IDisk & disk_) + : path(path_) + , new_timestamp(new_timestamp_) + , disk(disk_) + {} + + void execute() override + { + old_timestamp = disk.getLastModified(path); + disk.setLastModified(path, new_timestamp); + } + + void undo() override + { + disk.setLastModified(path, old_timestamp); + } +}; + +class UnlinkFileOperation final : public IMetadataOperation +{ + std::string path; + IDisk & disk; + std::string temp_filepath; +public: + UnlinkFileOperation(const std::string & path_, IDisk & disk_) + : path(path_) + , disk(disk_) + , temp_filepath(getTempFileName()) + { + } + + void execute() override + { + disk.moveFile(path, temp_filepath); + } + + void undo() override + { + disk.moveFile(temp_filepath, path); + } + + void finalize() override + { + disk.removeFileIfExists(temp_filepath); + } +}; + +class CreateDirectoryOperation final : public IMetadataOperation +{ +private: + std::string path; + IDisk & disk; +public: + CreateDirectoryOperation(const std::string & path_, IDisk & disk_) + : path(path_) + , disk(disk_) + { + } + + void execute() override + { + disk.createDirectory(path); + } + + void undo() override + { + disk.removeDirectory(path); + } +}; + +class CreateDirectoryRecursiveOperation : public IMetadataOperation +{ +private: + std::string path; + std::vector paths_created; + IDisk & disk; +public: + CreateDirectoryRecursiveOperation(const std::string & path_, IDisk & disk_) + : path(path_) + , disk(disk_) + { + } + + void execute() override + { + namespace fs = std::filesystem; + fs::path p(path); + while (!disk.exists(p)) + { + disk.createDirectory(p); + paths_created.push_back(p); + if (!p.has_parent_path()) + break; + p = p.parent_path(); + } + } + + void undo() override + { + for (const auto & path_created : paths_created) + disk.removeDirectory(path_created); + } +}; + +class RemoveDirectoryOperation : public IMetadataOperation +{ +private: + std::string path; + IDisk & disk; +public: + RemoveDirectoryOperation(const std::string & path_, IDisk & disk_) + : path(path_) + , disk(disk_) + {} + + void execute() override + { + disk.removeDirectory(path); + } + + void undo() override + { + disk.createDirectory(path); + } +}; + +class RemoveRecursiveOperation : public IMetadataOperation +{ + std::string path; + IDisk & disk; + std::string temp_path; +public: + RemoveRecursiveOperation(const std::string & path_, IDisk & disk_) + : path(path_) + , disk(disk_) + , temp_path(getTempFileName()) + { + } + + void execute() override + { + if (disk.isFile(path)) + disk.moveFile(path, temp_path); + if (disk.isDirectory(path)) + disk.moveDirectory(path, temp_path); + } + + void undo() override + { + if (disk.isFile(temp_path)) + disk.moveFile(temp_path, path); + if (disk.isDirectory(temp_path)) + disk.moveDirectory(temp_path, path); + } + + void finalize() override + { + if (disk.exists(temp_path)) + disk.removeRecursive(temp_path); + + if (disk.exists(path)) + disk.removeRecursive(path); + } +}; + + +class CreateHardlinkOperation : public IMetadataOperation +{ +private: + std::string path_from; + std::string path_to; + IDisk & disk; +public: + CreateHardlinkOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_) + : path_from(path_from_) + , path_to(path_to_) + , disk(disk_) + {} + + void execute() override + { + disk.createHardLink(path_from, path_to); + } + + void undo() override + { + disk.removeFile(path_to); + } +}; + +class MoveFileOperation : public IMetadataOperation +{ +private: + std::string path_from; + std::string path_to; + IDisk & disk; +public: + MoveFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_) + : path_from(path_from_) + , path_to(path_to_) + , disk(disk_) + {} + + void execute() override + { + disk.moveFile(path_from, path_to); + } + + void undo() override + { + disk.moveFile(path_to, path_from); + } +}; + +class MoveDirectoryOperation : public IMetadataOperation +{ +private: + std::string path_from; + std::string path_to; + IDisk & disk; +public: + MoveDirectoryOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_) + : path_from(path_from_) + , path_to(path_to_) + , disk(disk_) + {} + + void execute() override + { + disk.moveDirectory(path_from, path_to); + } + + void undo() override + { + disk.moveDirectory(path_to, path_from); + } +}; + + +class ReplaceFileOperation : public IMetadataOperation +{ +private: + std::string path_from; + std::string path_to; + IDisk & disk; + std::string temp_path_to; +public: + ReplaceFileOperation(const std::string & path_from_, const std::string & path_to_, IDisk & disk_) + : path_from(path_from_) + , path_to(path_to_) + , disk(disk_) + , temp_path_to(getTempFileName()) + { + } + + void execute() override + { + if (disk.exists(path_to)) + disk.moveFile(path_to, temp_path_to); + + disk.replaceFile(path_from, path_to); + } + + void undo() override + { + disk.moveFile(path_to, path_from); + disk.moveFile(temp_path_to, path_to); + } + + void finalize() override + { + disk.removeFileIfExists(temp_path_to); + } +}; + +class WriteFileOperation: public IMetadataOperation +{ +private: + std::string path; + std::string temp_path; + IDisk & disk; +public: + WriteFileOperation(const std::string & path_, const std::string & temp_path_, IDisk & disk_) + : path(path_) + , temp_path(temp_path_) + , disk(disk_) + {} + + void execute() override + { + disk.moveFile(temp_path, path); + } + + void undo() override + { + disk.removeFileIfExists(path); + disk.removeFileIfExists(temp_path); + } +}; + +} + +std::unique_ptr MetadataStorageFromDisk::writeFile( /// NOLINT + const std::string & path, + MetadataTransactionPtr transaction, + size_t buf_size, + const WriteSettings & settings) +{ + std::string temp_path = getTempFileName(); + transaction->addOperation(std::make_unique(path, temp_path, *disk)); + return disk->writeFile(temp_path, buf_size, WriteMode::Rewrite, settings); +} + + +void MetadataStorageFromDiskTransaction::addOperation(MetadataOperationPtr && operation) +{ + if (state != MetadataFromDiskTransactionState::PREPARING) + throw Exception(ErrorCodes::FS_METADATA_ERROR, "Cannot add operations to transaction in {} state, it should be in {} state", + toString(state), toString(MetadataFromDiskTransactionState::PREPARING)); + + operations.emplace_back(std::move(operation)); +} + +void MetadataStorageFromDiskTransaction::commit() +{ + if (state != MetadataFromDiskTransactionState::COMMITTED) + throw Exception(ErrorCodes::FS_METADATA_ERROR, "Cannot commit transaction in {} state, it should be in {} state", + toString(state), toString(MetadataFromDiskTransactionState::COMMITTED)); + + for (size_t i = 0; i < operations.size(); ++i) + { + try + { + operations[i]->execute(); + } + catch (Exception & ex) + { + ex.addMessage(fmt::format("While commiting operation #{}", i)); + failed_operation_index = i; + state = MetadataFromDiskTransactionState::FAILED; + throw; + } + } + + /// Do it in "best effort" mode + for (size_t i = 0; i < operations.size(); ++i) + { + try + { + operations[i]->finalize(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__, fmt::format("Failed to finalize operation #{}", i)); + } + } + + state = MetadataFromDiskTransactionState::COMMITTED; +} + +void MetadataStorageFromDiskTransaction::rollback() +{ + /// Otherwise everything is alright + if (state == MetadataFromDiskTransactionState::FAILED) + { + if (!failed_operation_index.has_value()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Transaction in failed state, but has not failed operations. It's a bug"); + + for (int64_t i = failed_operation_index.value(); i >= 0; --i) + { + try + { + operations[i]->undo(); + } + catch (Exception & ex) + { + state = MetadataFromDiskTransactionState::PARTIALLY_ROLLED_BACK; + ex.addMessage(fmt::format("While rolling back operation #{}", i)); + throw; + } + } + } + else + { + /// Nothing to do, transaction commited or not even started to commit + } + + state = MetadataFromDiskTransactionState::ROLLED_BACK; +} + +MetadataStorageFromDiskTransaction::~MetadataStorageFromDiskTransaction() +{ + if (state == MetadataFromDiskTransactionState::FAILED) + { + try + { + rollback(); + } + catch (...) + { + tryLogCurrentException(__PRETTY_FUNCTION__); + } + } +} + +const std::string & MetadataStorageFromDisk::getPath() const +{ + return disk->getPath(); +} + +bool MetadataStorageFromDisk::exists(const std::string & path) const +{ + return disk->exists(path); +} + +bool MetadataStorageFromDisk::isFile(const std::string & path) const +{ + return disk->isFile(path); +} + + +bool MetadataStorageFromDisk::isDirectory(const std::string & path) const +{ + return disk->isDirectory(path); +} + +Poco::Timestamp MetadataStorageFromDisk::getLastModified(const std::string & path) const +{ + return disk->getLastModified(path); +} + +std::vector MetadataStorageFromDisk::listDirectory(const std::string & path) const +{ + std::vector result_files; + disk->listFiles(path, result_files); + return result_files; +} + +DirectoryIteratorPtr MetadataStorageFromDisk::iterateDirectory(const std::string & path) +{ + return disk->iterateDirectory(path); +} + +std::unique_ptr MetadataStorageFromDisk::readFile( /// NOLINT + const std::string & path, + const ReadSettings & settings, + std::optional read_hint, + std::optional file_size) const +{ + return disk->readFile(path, settings, read_hint, file_size); +} + + +void MetadataStorageFromDisk::setLastModified(const std::string & path, const Poco::Timestamp & timestamp, MetadataTransactionPtr transaction) +{ + transaction->addOperation(std::make_unique(path, timestamp, *disk)); +} + +void MetadataStorageFromDisk::unlinkFile(const std::string & path, MetadataTransactionPtr transaction) +{ + transaction->addOperation(std::make_unique(path, *disk)); +} + +void MetadataStorageFromDisk::removeRecursive(const std::string & path, MetadataTransactionPtr transaction) +{ + transaction->addOperation(std::make_unique(path, *disk)); +} + +void MetadataStorageFromDisk::createDirectory(const std::string & path, MetadataTransactionPtr transaction) +{ + transaction->addOperation(std::make_unique(path, *disk)); +} + +void MetadataStorageFromDisk::createDicrectoryRecursive(const std::string & path, MetadataTransactionPtr transaction) +{ + transaction->addOperation(std::make_unique(path, *disk)); +} + +void MetadataStorageFromDisk::removeDirectory(const std::string & path, MetadataTransactionPtr transaction) +{ + transaction->addOperation(std::make_unique(path, *disk)); +} + +void MetadataStorageFromDisk::createHardLink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) +{ + transaction->addOperation(std::make_unique(path_from, path_to, *disk)); +} + +void MetadataStorageFromDisk::moveFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) +{ + transaction->addOperation(std::make_unique(path_from, path_to, *disk)); +} + +void MetadataStorageFromDisk::moveDirectory(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) +{ + transaction->addOperation(std::make_unique(path_from, path_to, *disk)); +} + +void MetadataStorageFromDisk::replaceFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) +{ + transaction->addOperation(std::make_unique(path_from, path_to, *disk)); +} + +} diff --git a/src/Disks/ObjectStorages/LocalDiskMetadataStorage.h b/src/Disks/ObjectStorages/MetadataStorageFromDisk.h similarity index 55% rename from src/Disks/ObjectStorages/LocalDiskMetadataStorage.h rename to src/Disks/ObjectStorages/MetadataStorageFromDisk.h index 46f1da25e9f..a73965b8c55 100644 --- a/src/Disks/ObjectStorages/LocalDiskMetadataStorage.h +++ b/src/Disks/ObjectStorages/MetadataStorageFromDisk.h @@ -1,83 +1,71 @@ #pragma once #include -#include +#include namespace DB { -enum class LocalMetadataTransactionState +enum class MetadataFromDiskTransactionState { PREPARING, - COMMITED, - ROLLED_BACK + FAILED, + COMMITTED, + ROLLED_BACK, + PARTIALLY_ROLLED_BACK, }; -struct LocalDiskMetadataTransaction : public IMetadataTransaction +std::string toString(MetadataFromDiskTransactionState state); + +struct MetadataStorageFromDiskTransaction : public IMetadataTransaction { private: - std::vector operations; - LocalMetadataTransactionState state{LocalMetadataTransactionState::PREPARING}; -public: - void addOperation(MetadataOperationPtr && operation) override - { - operations.emplace_back(std::move(operation)); - } + std::optional failed_operation_index; + std::vector operations; + MetadataFromDiskTransactionState state{MetadataFromDiskTransactionState::PREPARING}; +public: + void addOperation(MetadataOperationPtr && operation) override; void commit() override; void rollback() override; + + ~MetadataStorageFromDiskTransaction() override; }; -class LocalDiskMetadataStorage : public IMetadataStorage +class MetadataStorageFromDisk : public IMetadataStorage { private: - DiskPtr local_disk; + DiskPtr disk; public: - explicit LocalDiskMetadataStorage(DiskPtr local_disk_) - : local_disk(local_disk_) + explicit MetadataStorageFromDisk(DiskPtr disk_) + : disk(disk_) { } - MetadataTransactionPtr createTransaction() const + MetadataTransactionPtr createTransaction() const override { - return std::make_shared(local_disk); + return std::make_shared(); } - bool exists(const std::string & path) const override - { - return local_disk->exists(path); - } + const std::string & getPath() const override; - bool isFile(const std::string & path) const override - { - return local_disk->isFile(path); - } + bool exists(const std::string & path) const override; - bool isDirectory(const std::string & path) const override - { - return local_disk->isDirectory(path); - } + bool isFile(const std::string & path) const override; - Poco::Timestamp getLastModified(const std::string & path) const override - { - return local_disk->getLastModified(path); - } + bool isDirectory(const std::string & path) const override; - std::vector listDirectory(const std::string & path) const override - { - std::vector result_files; - local_disk->listFiles(path, result_files); - return result_files; - } + Poco::Timestamp getLastModified(const std::string & path) const override; + + std::vector listDirectory(const std::string & path) const override; + + DirectoryIteratorPtr iterateDirectory(const std::string & path) override; std::unique_ptr readFile( /// NOLINT const std::string & path, const ReadSettings & settings = ReadSettings{}, std::optional read_hint = {}, - std::optional file_size = {}) const override - { - return local_disk->readFile(path, settings, read_hint, file_size); - } + std::optional file_size = {}) const override; std::unique_ptr writeFile( /// NOLINT const std::string & path, @@ -95,10 +83,14 @@ public: void removeDirectory(const std::string & path, MetadataTransactionPtr transaction) override; - void createHardlink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override; + void removeRecursive(const std::string & path, MetadataTransactionPtr transaction) override; + + void createHardLink(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override; void moveFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override; + void moveDirectory(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override; + void replaceFile(const std::string & path_from, const std::string & path_to, MetadataTransactionPtr transaction) override; }; diff --git a/src/Disks/ObjectStorages/S3/registerDiskS3.cpp b/src/Disks/ObjectStorages/S3/registerDiskS3.cpp index d7e82ef3392..49c2d68f078 100644 --- a/src/Disks/ObjectStorages/S3/registerDiskS3.cpp +++ b/src/Disks/ObjectStorages/S3/registerDiskS3.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include @@ -85,6 +86,8 @@ void registerDiskS3(DiskFactory & factory) auto [metadata_path, metadata_disk] = prepareForLocalMetadata(name, config, config_prefix, context); + auto metadata_storage = std::make_shared(metadata_disk); + FileCachePtr cache = getCachePtrForDisk(name, config, config_prefix, context); ObjectStoragePtr s3_storage = std::make_unique( @@ -99,7 +102,7 @@ void registerDiskS3(DiskFactory & factory) name, uri.key, "DiskS3", - metadata_disk, + std::move(metadata_storage), std::move(s3_storage), DiskType::S3, send_metadata, diff --git a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp index 4f4a99f1b01..9f791db0b69 100644 --- a/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp +++ b/src/Storages/MergeTree/MergeTreeIndexGranularityInfo.cpp @@ -17,7 +17,7 @@ std::optional MergeTreeIndexGranularityInfo::getMarksExtensionFromF { if (disk->exists(path_to_part)) { - for (DiskDirectoryIteratorPtr it = disk->iterateDirectory(path_to_part); it->isValid(); it->next()) + for (DirectoryIteratorPtr it = disk->iterateDirectory(path_to_part); it->isValid(); it->next()) { const auto & ext = fs::path(it->path()).extension(); if (ext == getNonAdaptiveMrkExtension() diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 1d4b22d4a59..464c18563c3 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -10,6 +10,8 @@ #include #include +#include + #include #include @@ -8154,10 +8156,12 @@ public: void save(DiskPtr data_disk, const String & path) const { - auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf(); + auto metadata_storage = data_disk->getMetadataStorage(); auto file_path = getFileName(path); - auto buffer = metadata_disk->writeFile(file_path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite); + auto tx = metadata_storage->createTransaction(); + auto buffer = metadata_storage->writeFile(file_path, tx, DBMS_DEFAULT_BUFFER_SIZE); + writeIntText(version, *buffer); buffer->write("\n", 1); writeBoolText(is_replicated, *buffer); @@ -8170,16 +8174,18 @@ public: buffer->write("\n", 1); writeString(table_shared_id, *buffer); buffer->write("\n", 1); + + tx->commit(); } bool load(DiskPtr data_disk, const String & path) { - auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf(); + auto metadata_storage = data_disk->getMetadataStorage(); auto file_path = getFileName(path); - if (!metadata_disk->exists(file_path)) + if (!metadata_storage->exists(file_path)) return false; - auto buffer = metadata_disk->readFile(file_path, ReadSettings(), {}); + auto buffer = metadata_storage->readFile(file_path, ReadSettings(), {}); readIntText(version, *buffer); if (version != 1) { @@ -8202,8 +8208,14 @@ public: static void clean(DiskPtr data_disk, const String & path) { - auto metadata_disk = data_disk->getMetadataDiskIfExistsOrSelf(); - metadata_disk->removeFileIfExists(getFileName(path)); + auto metadata_storage = data_disk->getMetadataStorage(); + auto fname = getFileName(path); + if (metadata_storage->exists(fname)) + { + auto tx = metadata_storage->createTransaction(); + metadata_storage->unlinkFile(fname, tx); + tx->commit(); + } } private: