diff --git a/src/Disks/DiskObjectStorage.cpp b/src/Disks/DiskObjectStorage.cpp index 8fbde6dc6ca..04adebf1e82 100644 --- a/src/Disks/DiskObjectStorage.cpp +++ b/src/Disks/DiskObjectStorage.cpp @@ -569,16 +569,6 @@ void DiskObjectStorage::startup() LOG_INFO(log, "Starting up disk {}", name); object_storage->startup(); - if (send_metadata) - { - metadata_helper->restore(); - - if (metadata_helper->readSchemaVersion(remote_fs_root_path) < DiskObjectStorageMetadataHelper::RESTORABLE_SCHEMA_VERSION) - metadata_helper->migrateToRestorableSchema(); - - metadata_helper->findLastRevision(); - } - LOG_INFO(log, "Disk {} started up", name); } @@ -674,6 +664,26 @@ void DiskObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration object_storage->applyNewSettings(config, "storage_configuration.disks." + name, context_); } +void DiskObjectStorage::restoreMetadataIfNeeded(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) +{ + if (send_metadata) + { + LOG_DEBUG(log, "START RESTORING METADATA"); + metadata_helper->restore(config, config_prefix, context); + + if (metadata_helper->readSchemaVersion(object_storage.get(), remote_fs_root_path) < DiskObjectStorageMetadataHelper::RESTORABLE_SCHEMA_VERSION) + { + LOG_DEBUG(log, "DONE READING"); + metadata_helper->migrateToRestorableSchema(); + LOG_DEBUG(log, "MIGRATION FINISHED"); + } + + LOG_DEBUG(log, "SEARCHING LAST REVISION"); + metadata_helper->findLastRevision(); + LOG_DEBUG(log, "DONE RESTORING METADATA"); + } +} + DiskPtr DiskObjectStorageReservation::getDisk(size_t i) const { if (i != 0) @@ -750,14 +760,14 @@ void DiskObjectStorageMetadataHelper::findLastRevision() LOG_INFO(disk->log, "Found last revision number {} for disk {}", revision_counter, disk->name); } -int DiskObjectStorageMetadataHelper::readSchemaVersion(const String & source_path) const +int DiskObjectStorageMetadataHelper::readSchemaVersion(IObjectStorage * object_storage, const String & source_path) const { const std::string path = source_path + SCHEMA_VERSION_OBJECT; int version = 0; - if (!disk->object_storage->exists(path)) + if (!object_storage->exists(path)) return version; - auto buf = disk->object_storage->readObject(path); + auto buf = object_storage->readObject(path); readIntText(version, *buf); return version; @@ -800,20 +810,22 @@ void DiskObjectStorageMetadataHelper::migrateToRestorableSchemaRecursive(const S bool dir_contains_only_files = true; for (auto it = disk->iterateDirectory(path); it->isValid(); it->next()) + { if (disk->isDirectory(it->path())) { dir_contains_only_files = false; break; } + } /// The whole directory can be migrated asynchronously. if (dir_contains_only_files) { auto result = disk->getExecutor().execute([this, path] - { - for (auto it = disk->iterateDirectory(path); it->isValid(); it->next()) - migrateFileToRestorableSchema(it->path()); - }); + { + for (auto it = disk->iterateDirectory(path); it->isValid(); it->next()) + migrateFileToRestorableSchema(it->path()); + }); results.push_back(std::move(result)); } @@ -863,15 +875,18 @@ void DiskObjectStorageMetadataHelper::migrateToRestorableSchema() } } -void DiskObjectStorageMetadataHelper::restore() +void DiskObjectStorageMetadataHelper::restore(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) { if (!disk->exists(RESTORE_FILE_NAME)) + { return; + } try { RestoreInformation information; information.source_path = disk->remote_fs_root_path; + information.source_namespace = disk->object_storage->getObjectsNamespace(); readRestoreInformation(information); if (information.revision == 0) @@ -879,19 +894,28 @@ void DiskObjectStorageMetadataHelper::restore() if (!information.source_path.ends_with('/')) information.source_path += '/'; - /// In this case we need to additionally cleanup S3 from objects with later revision. - /// Will be simply just restore to different path. - if (information.source_path == disk->remote_fs_root_path && information.revision != LATEST_REVISION) - throw Exception("Restoring to the same bucket and path is allowed if revision is latest (0)", ErrorCodes::BAD_ARGUMENTS); + IObjectStorage * source_object_storage = disk->object_storage.get(); + if (information.source_namespace == disk->object_storage->getObjectsNamespace()) + { + /// In this case we need to additionally cleanup S3 from objects with later revision. + /// Will be simply just restore to different path. + if (information.source_path == disk->remote_fs_root_path && information.revision != LATEST_REVISION) + throw Exception("Restoring to the same bucket and path is allowed if revision is latest (0)", ErrorCodes::BAD_ARGUMENTS); - /// This case complicates S3 cleanup in case of unsuccessful restore. - if (information.source_path != disk->remote_fs_root_path && disk->remote_fs_root_path.starts_with(information.source_path)) - throw Exception("Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk", ErrorCodes::BAD_ARGUMENTS); + /// This case complicates S3 cleanup in case of unsuccessful restore. + if (information.source_path != disk->remote_fs_root_path && disk->remote_fs_root_path.starts_with(information.source_path)) + throw Exception("Restoring to the same bucket is allowed only if source path is not a sub-path of configured path in S3 disk", ErrorCodes::BAD_ARGUMENTS); + } + else + { + object_storage_from_another_namespace = disk->object_storage->cloneObjectStorage(information.source_namespace, config, config_prefix, context); + source_object_storage = object_storage_from_another_namespace.get(); + } LOG_INFO(disk->log, "Starting to restore disk {}. Revision: {}, Source path: {}", disk->name, information.revision, information.source_path); - if (readSchemaVersion(information.source_path) < RESTORABLE_SCHEMA_VERSION) + if (readSchemaVersion(source_object_storage, information.source_path) < RESTORABLE_SCHEMA_VERSION) throw Exception("Source bucket doesn't have restorable schema.", ErrorCodes::BAD_ARGUMENTS); LOG_INFO(disk->log, "Removing old metadata..."); @@ -901,8 +925,8 @@ void DiskObjectStorageMetadataHelper::restore() if (disk->exists(root)) disk->removeSharedRecursive(root + '/', !cleanup_s3, {}); - restoreFiles(information); - restoreFileOperations(information); + restoreFiles(source_object_storage, information); + restoreFileOperations(source_object_storage, information); disk->metadata_disk->removeFile(RESTORE_FILE_NAME); @@ -949,10 +973,12 @@ void DiskObjectStorageMetadataHelper::readRestoreInformation(RestoreInformation for (const auto & [key, value] : properties) { - ReadBufferFromString value_buffer (value); + ReadBufferFromString value_buffer(value); if (key == "revision") readIntText(restore_information.revision, value_buffer); + else if (key == "source_bucket" || key == "source_namespace") + readText(restore_information.source_namespace, value_buffer); else if (key == "source_path") readText(restore_information.source_path, value_buffer); else if (key == "detached") @@ -988,12 +1014,12 @@ static std::tuple extractRevisionAndOperationFromKey(const Strin return {(revision_str.empty() ? 0 : static_cast(std::bitset<64>(revision_str).to_ullong())), operation}; } -void DiskObjectStorageMetadataHelper::restoreFiles(const RestoreInformation & restore_information) +void DiskObjectStorageMetadataHelper::restoreFiles(IObjectStorage * source_object_storage, const RestoreInformation & restore_information) { LOG_INFO(disk->log, "Starting restore files for disk {}", disk->name); std::vector> results; - auto restore_files = [this, &restore_information, &results](const BlobsPathToSize & keys) + auto restore_files = [this, &source_object_storage, &restore_information, &results](const BlobsPathToSize & keys) { std::vector keys_names; for (const auto & [key, size] : keys) @@ -1012,9 +1038,9 @@ void DiskObjectStorageMetadataHelper::restoreFiles(const RestoreInformation & re if (!keys_names.empty()) { - auto result = disk->getExecutor().execute([this, &restore_information, keys_names]() + auto result = disk->getExecutor().execute([this, &source_object_storage, &restore_information, keys_names]() { - processRestoreFiles(restore_information.source_path, keys_names); + processRestoreFiles(source_object_storage, restore_information.source_path, keys_names); }); results.push_back(std::move(result)); @@ -1024,7 +1050,7 @@ void DiskObjectStorageMetadataHelper::restoreFiles(const RestoreInformation & re }; BlobsPathToSize children; - disk->object_storage->listPrefix(restore_information.source_path, children); + source_object_storage->listPrefix(restore_information.source_path, children); restore_files(children); for (auto & result : results) @@ -1036,11 +1062,11 @@ void DiskObjectStorageMetadataHelper::restoreFiles(const RestoreInformation & re } -void DiskObjectStorageMetadataHelper::processRestoreFiles(const String & source_path, std::vector keys) +void DiskObjectStorageMetadataHelper::processRestoreFiles(IObjectStorage * source_object_storage, const String & source_path, const std::vector & keys) { for (const auto & key : keys) { - auto meta = disk->object_storage->getObjectMetadata(key); + auto meta = source_object_storage->getObjectMetadata(key); auto object_attributes = meta.attributes; String path; @@ -1066,7 +1092,7 @@ void DiskObjectStorageMetadataHelper::processRestoreFiles(const String & source_ /// Copy object if we restore to different bucket / path. if (disk->remote_fs_root_path != source_path) - disk->object_storage->copyObject(key, disk->remote_fs_root_path + relative_key); + source_object_storage->copyObjectToAnotherObjectStorage(key, disk->remote_fs_root_path + relative_key, *disk->object_storage); auto updater = [relative_key, meta] (DiskObjectStorage::Metadata & metadata) { @@ -1088,13 +1114,13 @@ static String pathToDetached(const String & source_path) return fs::path(source_path).parent_path() / "detached/"; } -void DiskObjectStorageMetadataHelper::restoreFileOperations(const RestoreInformation & restore_information) +void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * source_object_storage, const RestoreInformation & restore_information) { /// Enable recording file operations if we restore to different bucket / path. - bool send_metadata = disk->remote_fs_root_path != restore_information.source_path; + bool send_metadata = source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->remote_fs_root_path != restore_information.source_path; std::set renames; - auto restore_file_operations = [this, &restore_information, &renames, &send_metadata](const BlobsPathToSize & keys) + auto restore_file_operations = [this, &source_object_storage, &restore_information, &renames, &send_metadata](const BlobsPathToSize & keys) { const String rename = "rename"; const String hardlink = "hardlink"; @@ -1117,7 +1143,7 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(const RestoreInforma if (send_metadata) revision_counter = revision - 1; - auto object_attributes = *(disk->object_storage->getObjectMetadata(key).attributes); + auto object_attributes = *(source_object_storage->getObjectMetadata(key).attributes); if (operation == rename) { auto from_path = object_attributes["from_path"]; @@ -1180,7 +1206,7 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(const RestoreInforma }; BlobsPathToSize children; - disk->object_storage->listPrefix(restore_information.source_path + "operations/", children); + source_object_storage->listPrefix(restore_information.source_path + "operations/", children); restore_file_operations(children); if (restore_information.detached) @@ -1224,5 +1250,4 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(const RestoreInforma LOG_INFO(disk->log, "File operations restored for disk {}", disk->name); } - } diff --git a/src/Disks/DiskObjectStorage.h b/src/Disks/DiskObjectStorage.h index 2147f9527d5..7e5d30dfea2 100644 --- a/src/Disks/DiskObjectStorage.h +++ b/src/Disks/DiskObjectStorage.h @@ -164,6 +164,7 @@ public: void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override; + void restoreMetadataIfNeeded(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context); private: const String name; const String remote_fs_root_path; @@ -284,6 +285,7 @@ public: struct RestoreInformation { UInt64 revision = LATEST_REVISION; + String source_namespace; String source_path; bool detached = false; }; @@ -293,18 +295,18 @@ public: void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const; void findLastRevision(); - int readSchemaVersion(const String & source_path) const; + int readSchemaVersion(IObjectStorage * object_storage, const String & source_path) const; void saveSchemaVersion(const int & version) const; void updateObjectMetadata(const String & key, const ObjectAttributes & metadata) const; void migrateFileToRestorableSchema(const String & path) const; void migrateToRestorableSchemaRecursive(const String & path, Futures & results); void migrateToRestorableSchema(); - void restore(); + void restore(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context); void readRestoreInformation(RestoreInformation & restore_information); - void restoreFiles(const RestoreInformation & restore_information); - void processRestoreFiles(const String & source_path, std::vector keys); - void restoreFileOperations(const RestoreInformation & restore_information); + void restoreFiles(IObjectStorage * source_object_storage, const RestoreInformation & restore_information); + void processRestoreFiles(IObjectStorage * source_object_storage, const String & source_path, const std::vector & keys); + void restoreFileOperations(IObjectStorage * source_object_storage, const RestoreInformation & restore_information); std::atomic revision_counter = 0; inline static const String RESTORE_FILE_NAME = "restore"; @@ -318,6 +320,8 @@ public: DiskObjectStorage * disk; + ObjectStoragePtr object_storage_from_another_namespace; + ReadSettings read_settings; }; diff --git a/src/Disks/IObjectStorage.cpp b/src/Disks/IObjectStorage.cpp index ac8f3fc39e8..44b9430172b 100644 --- a/src/Disks/IObjectStorage.cpp +++ b/src/Disks/IObjectStorage.cpp @@ -1,5 +1,6 @@ #include #include +#include namespace DB { @@ -34,4 +35,15 @@ void IObjectStorage::removeFromCache(const std::string & path) } } +void IObjectStorage::copyObjectToAnotherObjectStorage(const std::string & object_from, const std::string & object_to, IObjectStorage & object_storage_to, std::optional object_to_attributes) +{ + if (&object_storage_to == this) + copyObject(object_from, object_to, object_to_attributes); + + auto in = readObject(object_from); + auto out = object_storage_to.writeObject(object_to); + copyData(*in, *out); + out->finalize(); +} + } diff --git a/src/Disks/IObjectStorage.h b/src/Disks/IObjectStorage.h index f2cc9b90294..6a66ffb622e 100644 --- a/src/Disks/IObjectStorage.h +++ b/src/Disks/IObjectStorage.h @@ -97,6 +97,8 @@ public: virtual void copyObject(const std::string & object_from, const std::string & object_to, std::optional object_to_attributes = {}) = 0; + virtual void copyObjectToAnotherObjectStorage(const std::string & object_from, const std::string & object_to, IObjectStorage & object_storage_to, std::optional object_to_attributes = {}); + virtual ~IObjectStorage() = default; std::string getCacheBasePath() const; @@ -113,6 +115,10 @@ public: virtual void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) = 0; + virtual String getObjectsNamespace() const = 0; + + virtual std::unique_ptr cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) = 0; + protected: FileCachePtr cache; }; diff --git a/src/Disks/S3/registerDiskS3.cpp b/src/Disks/S3/registerDiskS3.cpp index 6a052dfab02..54b736788fa 100644 --- a/src/Disks/S3/registerDiskS3.cpp +++ b/src/Disks/S3/registerDiskS3.cpp @@ -79,7 +79,7 @@ void registerDiskS3(DiskFactory & factory) getSettings(config, config_prefix, context), uri.version_id, uri.bucket); - std::shared_ptr s3disk = std::make_shared( + std::shared_ptr s3disk = std::make_shared( name, uri.key, "DiskS3", @@ -98,6 +98,9 @@ void registerDiskS3(DiskFactory & factory) s3disk->startup(); + s3disk->restoreMetadataIfNeeded(config, config_prefix, context); + + std::shared_ptr disk_result = s3disk; #ifdef NDEBUG bool use_cache = true; @@ -110,10 +113,11 @@ void registerDiskS3(DiskFactory & factory) if (config.getBool(config_prefix + ".cache_enabled", use_cache)) { String cache_path = config.getString(config_prefix + ".cache_path", context->getPath() + "disks/" + name + "/cache/"); - s3disk = wrapWithCache(s3disk, "s3-cache", cache_path, metadata_path); + disk_result = wrapWithCache(disk_result, "s3-cache", cache_path, metadata_path); } - return std::make_shared(s3disk); + LOG_DEBUG(&Poco::Logger::get("DEBUG"), "DONE DISK"); + return std::make_shared(disk_result); }; factory.registerDiskType("s3", creator); } diff --git a/src/Disks/S3ObjectStorage.cpp b/src/Disks/S3ObjectStorage.cpp index a941022a574..0a7bd45d546 100644 --- a/src/Disks/S3ObjectStorage.cpp +++ b/src/Disks/S3ObjectStorage.cpp @@ -81,11 +81,15 @@ bool S3ObjectStorage::exists(const std::string & path) const auto object_head = requestObjectHeadData(bucket, path); if (!object_head.IsSuccess()) { - if (object_head.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY) + if (object_head.GetError().GetErrorType() == Aws::S3::S3Errors::RESOURCE_NOT_FOUND) + { + LOG_DEBUG(&Poco::Logger::get("DEBUG"), "OBJECT DOESNT {} EXISTS", path); return false; + } throwIfError(object_head); } + LOG_DEBUG(&Poco::Logger::get("DEBUG"), "OBJECT {} EXISTS", path); return true; } @@ -291,6 +295,15 @@ ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) cons return result; } +void S3ObjectStorage::copyObjectToAnotherObjectStorage(const std::string & object_from, const std::string & object_to, IObjectStorage & object_storage_to, std::optional object_to_attributes) +{ + /// Shortcut for S3 + if (auto * dest_s3 = dynamic_cast(&object_storage_to); dest_s3 != nullptr) + copyObjectImpl(bucket, object_from, dest_s3->bucket, object_to, {}, object_to_attributes); + else + IObjectStorage::copyObjectToAnotherObjectStorage(object_from, object_to, object_storage_to, object_to_attributes); +} + void S3ObjectStorage::copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key, std::optional head, std::optional metadata) const @@ -428,7 +441,7 @@ void S3ObjectStorage::startup() auto client_ptr = client.get(); /// Need to be enabled if it was disabled during shutdown() call. - const_cast(*client_ptr.get()).EnableRequestProcessing(); + const_cast(*client_ptr).EnableRequestProcessing(); } void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) @@ -437,6 +450,15 @@ void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & client.set(getClient(config, config_prefix, context)); } +std::unique_ptr S3ObjectStorage::cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) +{ + return std::make_unique( + nullptr, getClient(config, config_prefix, context), + getSettings(config, config_prefix, context), + version_id, new_namespace); } +} + + #endif diff --git a/src/Disks/S3ObjectStorage.h b/src/Disks/S3ObjectStorage.h index b0762d07535..7632a643130 100644 --- a/src/Disks/S3ObjectStorage.h +++ b/src/Disks/S3ObjectStorage.h @@ -17,7 +17,6 @@ namespace DB struct S3ObjectStorageSettings { - S3ObjectStorageSettings() = default; S3ObjectStorageSettings( @@ -95,9 +94,7 @@ public: void copyObject(const std::string & object_from, const std::string & object_to, std::optional object_to_attributes = {}) override; - void setNewSettings(std::unique_ptr && s3_settings_); - - void setNewClient(std::unique_ptr && client_); + void copyObjectToAnotherObjectStorage(const std::string & object_from, const std::string & object_to, IObjectStorage & object_storage_to, std::optional object_to_attributes = {}) override; void shutdown() override; @@ -105,7 +102,13 @@ public: void applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override; + String getObjectsNamespace() const override { return bucket; } + + std::unique_ptr cloneObjectStorage(const std::string & new_namespace, const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) override; private: + void setNewSettings(std::unique_ptr && s3_settings_); + + void setNewClient(std::unique_ptr && client_); void copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key, std::optional head = std::nullopt,