From eba60ff38f220b6c2e9ab6142ebd4760df8e706c Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 13 May 2022 17:00:47 +0200 Subject: [PATCH] Fix restorable schema --- .../registerDiskAzureBlobStorage.cpp | 2 +- src/Disks/DiskDecorator.cpp | 4 +- src/Disks/DiskDecorator.h | 2 +- src/Disks/DiskLocal.cpp | 4 +- src/Disks/DiskLocal.h | 2 +- src/Disks/DiskObjectStorage.cpp | 114 +++++++++++++----- src/Disks/DiskRestartProxy.cpp | 7 +- src/Disks/DiskRestartProxy.h | 2 +- src/Disks/IDisk.h | 2 +- .../IO/WriteIndirectBufferFromRemoteFS.cpp | 3 +- src/Disks/S3/registerDiskS3.cpp | 4 +- src/Interpreters/InterpreterSystemQuery.cpp | 2 +- 12 files changed, 100 insertions(+), 48 deletions(-) diff --git a/src/Disks/AzureBlobStorage/registerDiskAzureBlobStorage.cpp b/src/Disks/AzureBlobStorage/registerDiskAzureBlobStorage.cpp index 8b2429263bb..56df793783e 100644 --- a/src/Disks/AzureBlobStorage/registerDiskAzureBlobStorage.cpp +++ b/src/Disks/AzureBlobStorage/registerDiskAzureBlobStorage.cpp @@ -103,7 +103,7 @@ void registerDiskAzureBlobStorage(DiskFactory & factory) checkRemoveAccess(*azure_blob_storage_disk); } - azure_blob_storage_disk->startup(); + azure_blob_storage_disk->startup(context); if (config.getBool(config_prefix + ".cache_enabled", true)) { diff --git a/src/Disks/DiskDecorator.cpp b/src/Disks/DiskDecorator.cpp index 80cfc23d210..02babfbb59f 100644 --- a/src/Disks/DiskDecorator.cpp +++ b/src/Disks/DiskDecorator.cpp @@ -211,9 +211,9 @@ void DiskDecorator::shutdown() delegate->shutdown(); } -void DiskDecorator::startup() +void DiskDecorator::startup(ContextPtr context) { - delegate->startup(); + delegate->startup(context); } void DiskDecorator::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) diff --git a/src/Disks/DiskDecorator.h b/src/Disks/DiskDecorator.h index d707eb3e51d..b86c520d5d8 100644 --- a/src/Disks/DiskDecorator.h +++ b/src/Disks/DiskDecorator.h @@ -71,7 +71,7 @@ public: void onFreeze(const String & path) override; SyncGuardPtr getDirectorySyncGuard(const String & path) const override; void shutdown() override; - void startup() override; + void startup(ContextPtr context) override; void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map) override; String getCacheBasePath() const override { return delegate->getCacheBasePath(); } std::vector getRemotePaths(const String & path) const override { return delegate->getRemotePaths(path); } diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index a55d588f2b5..e1e299a0d52 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -484,7 +484,7 @@ DiskLocal::DiskLocal( disk_checker = std::make_unique(this, context, local_disk_check_period_ms); } -void DiskLocal::startup() +void DiskLocal::startup(ContextPtr) { try { @@ -672,7 +672,7 @@ void registerDiskLocal(DiskFactory & factory) std::shared_ptr disk = std::make_shared(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0)); - disk->startup(); + disk->startup(context); return std::make_shared(disk); }; factory.registerDiskType("local", creator); diff --git a/src/Disks/DiskLocal.h b/src/Disks/DiskLocal.h index 61faccbe2a5..101bf0e1f13 100644 --- a/src/Disks/DiskLocal.h +++ b/src/Disks/DiskLocal.h @@ -110,7 +110,7 @@ public: bool isBroken() const override { return broken; } - void startup() override; + void startup(ContextPtr) override; void shutdown() override; diff --git a/src/Disks/DiskObjectStorage.cpp b/src/Disks/DiskObjectStorage.cpp index 04adebf1e82..8f472c713b7 100644 --- a/src/Disks/DiskObjectStorage.cpp +++ b/src/Disks/DiskObjectStorage.cpp @@ -32,6 +32,12 @@ namespace ErrorCodes extern const int SUPPORT_IS_DISABLED; } +static String revisionToString(UInt64 revision) +{ + return std::bitset<64>(revision).to_string(); +} + + DiskObjectStorage::Metadata DiskObjectStorage::Metadata::readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_) { Metadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_); @@ -340,16 +346,35 @@ size_t DiskObjectStorage::getFileSize(const String & path) const return readMetadata(path).total_size; } -void DiskObjectStorage::moveFile(const String & from_path, const String & to_path) +void DiskObjectStorage::moveFile(const String & from_path, const String & to_path, bool should_send_metadata) { + LOG_DEBUG(&Poco::Logger::get("DEBUG"), "MOVE FILE"); if (exists(to_path)) throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS); + if (should_send_metadata) + { + auto revision = metadata_helper->revision_counter + 1; + metadata_helper->revision_counter += 1; + + const ObjectAttributes object_metadata { + {"from_path", from_path}, + {"to_path", to_path} + }; + metadata_helper->createFileOperationObject("rename", revision, object_metadata); + } + metadata_disk->moveFile(from_path, to_path); } +void DiskObjectStorage::moveFile(const String & from_path, const String & to_path) +{ + moveFile(from_path, to_path, send_metadata); +} + void DiskObjectStorage::replaceFile(const String & from_path, const String & to_path) { + LOG_DEBUG(&Poco::Logger::get("DEBUG"), "REPLACE FILE"); if (exists(to_path)) { const String tmp_path = to_path + ".old"; @@ -363,6 +388,7 @@ void DiskObjectStorage::replaceFile(const String & from_path, const String & to_ void DiskObjectStorage::removeSharedFile(const String & path, bool delete_metadata_only) { + LOG_DEBUG(&Poco::Logger::get("DEBUG"), "Remove shared file"); std::vector paths_to_remove; removeMetadata(path, paths_to_remove); @@ -372,6 +398,7 @@ void DiskObjectStorage::removeSharedFile(const String & path, bool delete_metada void DiskObjectStorage::removeFromRemoteFS(const std::vector & paths) { + LOG_DEBUG(&Poco::Logger::get("DEBUG"), "Read from remote FS"); object_storage->removeObjects(paths); } @@ -416,17 +443,35 @@ bool DiskObjectStorage::checkUniqueId(const String & id) const return checkObjectExists(id); } -void DiskObjectStorage::createHardLink(const String & src_path, const String & dst_path) +void DiskObjectStorage::createHardLink(const String & src_path, const String & dst_path, bool should_send_metadata) { + LOG_DEBUG(&Poco::Logger::get("DEBUG"), "HARDLINK FILE"); readUpdateAndStoreMetadata(src_path, false, [](Metadata & metadata) { metadata.ref_count++; return true; }); + if (should_send_metadata && !dst_path.starts_with("shadow/")) + { + auto revision = metadata_helper->revision_counter + 1; + metadata_helper->revision_counter += 1; + const ObjectAttributes object_metadata { + {"src_path", src_path}, + {"dst_path", dst_path} + }; + metadata_helper->createFileOperationObject("hardlink", revision, object_metadata); + } + /// Create FS hardlink to metadata file. metadata_disk->createHardLink(src_path, dst_path); - } +void DiskObjectStorage::createHardLink(const String & src_path, const String & dst_path) +{ + createHardLink(src_path, dst_path, send_metadata); +} + + void DiskObjectStorage::setReadOnly(const String & path) { + LOG_DEBUG(&Poco::Logger::get("DEBUG"), "set readonly"); /// We should store read only flag inside metadata file (instead of using FS flag), /// because we modify metadata file when create hard-links from it. readUpdateAndStoreMetadata(path, false, [](Metadata & metadata) { metadata.read_only = true; return true; }); @@ -560,15 +605,19 @@ void DiskObjectStorage::removeMetadataRecursive(const String & path, std::unorde void DiskObjectStorage::shutdown() { + LOG_INFO(log, "Shutting down disk {}", name); object_storage->shutdown(); + LOG_INFO(log, "Disk {} shut down", name); } -void DiskObjectStorage::startup() +void DiskObjectStorage::startup(ContextPtr context) { LOG_INFO(log, "Starting up disk {}", name); object_storage->startup(); + restoreMetadataIfNeeded(context->getConfigRef(), "storage_configuration.disks." + name, context); + LOG_INFO(log, "Disk {} started up", name); } @@ -649,13 +698,24 @@ std::unique_ptr DiskObjectStorage::writeFile( { auto blob_name = getRandomASCIIString(); + std::optional object_attributes; + if (send_metadata) + { + auto revision = metadata_helper->revision_counter + 1; + metadata_helper->revision_counter++; + object_attributes = { + {"path", path} + }; + blob_name = "r" + revisionToString(revision) + "-file-" + blob_name; + } + auto create_metadata_callback = [this, path, blob_name, mode] (size_t count) { readOrCreateUpdateAndStoreMetadata(path, mode, false, [blob_name, count] (DiskObjectStorage::Metadata & metadata) { metadata.addObject(blob_name, count); return true; }); }; - return object_storage->writeObject(fs::path(remote_fs_root_path) / blob_name, {}, create_metadata_callback, buf_size, settings); + return object_storage->writeObject(fs::path(remote_fs_root_path) / blob_name, object_attributes, create_metadata_callback, buf_size, settings); } @@ -725,10 +785,6 @@ DiskObjectStorageReservation::~DiskObjectStorageReservation() } } -static String revisionToString(UInt64 revision) -{ - return std::bitset<64>(revision).to_string(); -} void DiskObjectStorageMetadataHelper::createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const { @@ -877,8 +933,11 @@ void DiskObjectStorageMetadataHelper::migrateToRestorableSchema() void DiskObjectStorageMetadataHelper::restore(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context) { + LOG_INFO(disk->log, "Restore operation for disk {} called", disk->name); + if (!disk->exists(RESTORE_FILE_NAME)) { + LOG_INFO(disk->log, "No restore file '{}' exists, finishing restore", RESTORE_FILE_NAME); return; } @@ -925,6 +984,7 @@ void DiskObjectStorageMetadataHelper::restore(const Poco::Util::AbstractConfigur if (disk->exists(root)) disk->removeSharedRecursive(root + '/', !cleanup_s3, {}); + LOG_INFO(disk->log, "Old metadata removed, restoring new one"); restoreFiles(source_object_storage, information); restoreFileOperations(source_object_storage, information); @@ -1024,6 +1084,9 @@ void DiskObjectStorageMetadataHelper::restoreFiles(IObjectStorage * source_objec std::vector keys_names; for (const auto & [key, size] : keys) { + + LOG_INFO(disk->log, "Calling restore for key for disk {}", key); + /// Skip file operations objects. They will be processed separately. if (key.find("/operations/") != String::npos) continue; @@ -1051,6 +1114,7 @@ void DiskObjectStorageMetadataHelper::restoreFiles(IObjectStorage * source_objec BlobsPathToSize children; source_object_storage->listPrefix(restore_information.source_path, children); + restore_files(children); for (auto & result : results) @@ -1091,7 +1155,7 @@ void DiskObjectStorageMetadataHelper::processRestoreFiles(IObjectStorage * sourc auto relative_key = shrinkKey(source_path, key); /// Copy object if we restore to different bucket / path. - if (disk->remote_fs_root_path != source_path) + if (source_object_storage->getObjectsNamespace() != disk->object_storage->getObjectsNamespace() || disk->remote_fs_root_path != source_path) source_object_storage->copyObjectToAnotherObjectStorage(key, disk->remote_fs_root_path + relative_key, *disk->object_storage); auto updater = [relative_key, meta] (DiskObjectStorage::Metadata & metadata) @@ -1107,6 +1171,14 @@ void DiskObjectStorageMetadataHelper::processRestoreFiles(IObjectStorage * sourc } +void DiskObjectStorage::onFreeze(const String & path) +{ + createDirectories(path); + auto revision_file_buf = metadata_disk->writeFile(path + "revision.txt", 32); + writeIntText(metadata_helper->revision_counter.load(), *revision_file_buf); + revision_file_buf->finalize(); +} + static String pathToDetached(const String & source_path) { if (source_path.ends_with('/')) @@ -1150,16 +1222,7 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * sou auto to_path = object_attributes["to_path"]; if (disk->exists(from_path)) { - disk->moveFile(from_path, to_path); - if (send_metadata) - { - auto next_revision = ++revision_counter; - const ObjectAttributes object_metadata { - {"from_path", from_path}, - {"to_path", to_path} - }; - createFileOperationObject("rename", next_revision, object_attributes); - } + disk->moveFile(from_path, to_path, send_metadata); LOG_TRACE(disk->log, "Revision {}. Restored rename {} -> {}", revision, from_path, to_path); @@ -1187,16 +1250,7 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * sou if (disk->exists(src_path)) { disk->createDirectories(directoryPath(dst_path)); - if (send_metadata && !dst_path.starts_with("shadow/")) - { - auto next_revision = ++revision_counter; - const ObjectAttributes object_metadata { - {"src_path", src_path}, - {"dst_path", dst_path} - }; - createFileOperationObject("hardlink", next_revision, object_attributes); - } - disk->createHardLink(src_path, dst_path); + disk->createHardLink(src_path, dst_path, send_metadata); LOG_TRACE(disk->log, "Revision {}. Restored hardlink {} -> {}", revision, src_path, dst_path); } } diff --git a/src/Disks/DiskRestartProxy.cpp b/src/Disks/DiskRestartProxy.cpp index 8bb31cec55f..903caf705c5 100644 --- a/src/Disks/DiskRestartProxy.cpp +++ b/src/Disks/DiskRestartProxy.cpp @@ -6,8 +6,7 @@ namespace DB { namespace ErrorCodes -{ - extern const int DEADLOCK_AVOIDED; +{extern const int DEADLOCK_AVOIDED; } using Millis = std::chrono::milliseconds; @@ -329,7 +328,7 @@ void DiskRestartProxy::getRemotePathsRecursive(const String & path, std::vector< return DiskDecorator::getRemotePathsRecursive(path, paths_map); } -void DiskRestartProxy::restart() +void DiskRestartProxy::restart(ContextPtr context) { /// Speed up processing unhealthy requests. DiskDecorator::shutdown(); @@ -352,7 +351,7 @@ void DiskRestartProxy::restart() LOG_INFO(log, "Restart lock acquired. Restarting disk {}", DiskDecorator::getName()); - DiskDecorator::startup(); + DiskDecorator::startup(context); LOG_INFO(log, "Disk restarted {}", DiskDecorator::getName()); } diff --git a/src/Disks/DiskRestartProxy.h b/src/Disks/DiskRestartProxy.h index d30c2fdbbfb..084e06e3f18 100644 --- a/src/Disks/DiskRestartProxy.h +++ b/src/Disks/DiskRestartProxy.h @@ -68,7 +68,7 @@ public: std::vector getRemotePaths(const String & path) const override; void getRemotePathsRecursive(const String & path, std::vector & paths_map) override; - void restart(); + void restart(ContextPtr context); private: friend class RestartAwareReadBuffer; diff --git a/src/Disks/IDisk.h b/src/Disks/IDisk.h index 1071e1294b6..cf8b1a09ce9 100644 --- a/src/Disks/IDisk.h +++ b/src/Disks/IDisk.h @@ -297,7 +297,7 @@ public: virtual void shutdown() {} /// Performs action on disk startup. - virtual void startup() {} + virtual void startup(ContextPtr) {} /// Return some uniq string for file, overrode for IDiskRemote /// Required for distinguish different copies of the same part on remote disk diff --git a/src/Disks/IO/WriteIndirectBufferFromRemoteFS.cpp b/src/Disks/IO/WriteIndirectBufferFromRemoteFS.cpp index dca2fb17ba7..77da60ca07d 100644 --- a/src/Disks/IO/WriteIndirectBufferFromRemoteFS.cpp +++ b/src/Disks/IO/WriteIndirectBufferFromRemoteFS.cpp @@ -36,7 +36,8 @@ WriteIndirectBufferFromRemoteFS::~WriteIndirectBufferFromRemoteFS() void WriteIndirectBufferFromRemoteFS::finalizeImpl() { WriteBufferFromFileDecorator::finalizeImpl(); - create_metadata_callback(count()); + if (create_metadata_callback) + create_metadata_callback(count()); } diff --git a/src/Disks/S3/registerDiskS3.cpp b/src/Disks/S3/registerDiskS3.cpp index 54b736788fa..b344375f05b 100644 --- a/src/Disks/S3/registerDiskS3.cpp +++ b/src/Disks/S3/registerDiskS3.cpp @@ -96,9 +96,7 @@ void registerDiskS3(DiskFactory & factory) checkRemoveAccess(*s3disk); } - s3disk->startup(); - - s3disk->restoreMetadataIfNeeded(config, config_prefix, context); + s3disk->startup(context); std::shared_ptr disk_result = s3disk; diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index b52645c7854..d49ab933f23 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -780,7 +780,7 @@ void InterpreterSystemQuery::restartDisk(String & name) auto disk = getContext()->getDisk(name); if (DiskRestartProxy * restart_proxy = dynamic_cast(disk.get())) - restart_proxy->restart(); + restart_proxy->restart(getContext()); else throw Exception("Disk " + name + " doesn't have possibility to restart", ErrorCodes::BAD_ARGUMENTS); }