From c7b16065e14b4e7f5c4179bc0262407ae4cbec6e Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 25 May 2022 21:47:05 +0200 Subject: [PATCH] Merge with master --- src/Common/ErrorCodes.cpp | 2 +- .../ObjectStorages/DiskObjectStorage.cpp | 11 +++++ src/Disks/ObjectStorages/DiskObjectStorage.h | 4 ++ .../DiskObjectStorageMetadataHelper.cpp | 41 ++++++++++++++--- .../DiskObjectStorageMetadataHelper.h | 45 ++++++++++++++----- src/Disks/ObjectStorages/S3/diskSettings.cpp | 9 ++-- .../ObjectStorages/S3/registerDiskS3.cpp | 1 + src/Storages/MergeTree/DataPartsExchange.cpp | 2 +- src/Storages/StorageS3.cpp | 6 ++- src/Storages/StorageS3.h | 7 ++- 10 files changed, 103 insertions(+), 25 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index ce457cda1f2..973dde10756 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -627,8 +627,8 @@ M(656, MEILISEARCH_EXCEPTION) \ M(657, UNSUPPORTED_MEILISEARCH_TYPE) \ M(658, MEILISEARCH_MISSING_SOME_COLUMNS) \ - M(659, HDFS_ERROR) \ M(659, UNKNOWN_STATUS_OF_TRANSACTION) \ + M(660, HDFS_ERROR) \ \ M(999, KEEPER_EXCEPTION) \ M(1000, POCO_EXCEPTION) \ diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.cpp b/src/Disks/ObjectStorages/DiskObjectStorage.cpp index c235e1a864a..65b1d5a5bdf 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorage.cpp @@ -622,6 +622,17 @@ void DiskObjectStorage::restoreMetadataIfNeeded(const Poco::Util::AbstractConfig } } +void DiskObjectStorage::syncRevision(UInt64 revision) +{ + metadata_helper->syncRevision(revision); +} + +UInt64 DiskObjectStorage::getRevision() const +{ + return metadata_helper->getRevision(); +} + + DiskPtr DiskObjectStorageReservation::getDisk(size_t i) const { if (i != 0) diff --git a/src/Disks/ObjectStorages/DiskObjectStorage.h b/src/Disks/ObjectStorages/DiskObjectStorage.h index 9a60a7ad25e..d89c00a5567 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorage.h +++ b/src/Disks/ObjectStorages/DiskObjectStorage.h @@ -172,6 +172,10 @@ public: void restoreMetadataIfNeeded(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context); void onFreeze(const String & path) override; + + void syncRevision(UInt64 revision) override; + + UInt64 getRevision() const override; private: const String name; const String remote_fs_root_path; diff --git a/src/Disks/ObjectStorages/DiskObjectStorageMetadataHelper.cpp b/src/Disks/ObjectStorages/DiskObjectStorageMetadataHelper.cpp index a7e34f7ccd4..b09debf9a43 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageMetadataHelper.cpp +++ b/src/Disks/ObjectStorages/DiskObjectStorageMetadataHelper.cpp @@ -23,7 +23,7 @@ static String revisionToString(UInt64 revision) void DiskObjectStorageMetadataHelper::createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const { - const String path = disk->remote_fs_root_path + "operations/r" + revisionToString(revision) + "-" + operation_name; + const String path = disk->remote_fs_root_path + "operations/r" + revisionToString(revision) + operation_log_suffix + "-" + operation_name; auto buf = disk->object_storage->writeObject(path, WriteMode::Rewrite, metadata); buf->write('0'); buf->finalize(); @@ -300,15 +300,45 @@ static String shrinkKey(const String & path, const String & key) static std::tuple extractRevisionAndOperationFromKey(const String & key) { String revision_str; + String suffix; String operation; - /// Key has format: ../../r{revision}-{operation} - static const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+)$"}; + /// Key has format: ../../r{revision}(-{hostname})-{operation} + static const re2::RE2 key_regexp{".*/r(\\d+)(-[\\w\\d\\-\\.]+)?-(\\w+)$"}; - re2::RE2::FullMatch(key, key_regexp, &revision_str, &operation); + re2::RE2::FullMatch(key, key_regexp, &revision_str, &suffix, &operation); return {(revision_str.empty() ? 0 : static_cast(std::bitset<64>(revision_str).to_ullong())), operation}; } +void DiskObjectStorageMetadataHelper::moveRecursiveOrRemove(const String & from_path, const String & to_path, bool send_metadata) +{ + if (disk->exists(to_path)) + { + if (send_metadata) + { + auto revision = ++revision_counter; + const ObjectAttributes object_metadata { + {"from_path", from_path}, + {"to_path", to_path} + }; + createFileOperationObject("rename", revision, object_metadata); + } + if (disk->isDirectory(from_path)) + { + for (auto it = disk->iterateDirectory(from_path); it->isValid(); it->next()) + moveRecursiveOrRemove(it->path(), fs::path(to_path) / it->name(), false); + } + else + { + disk->removeFile(from_path); + } + } + else + { + disk->moveFile(from_path, to_path, send_metadata); + } +} + void DiskObjectStorageMetadataHelper::restoreFiles(IObjectStorage * source_object_storage, const RestoreInformation & restore_information) { LOG_INFO(disk->log, "Starting restore files for disk {}", disk->name); @@ -385,7 +415,6 @@ void DiskObjectStorageMetadataHelper::processRestoreFiles(IObjectStorage * sourc else continue; - disk->createDirectories(directoryPath(path)); auto relative_key = shrinkKey(source_path, key); @@ -457,7 +486,7 @@ void DiskObjectStorageMetadataHelper::restoreFileOperations(IObjectStorage * sou auto to_path = object_attributes["to_path"]; if (disk->exists(from_path)) { - disk->moveFile(from_path, to_path, send_metadata); + moveRecursiveOrRemove(from_path, to_path, send_metadata); LOG_TRACE(disk->log, "Revision {}. Restored rename {} -> {}", revision, from_path, to_path); diff --git a/src/Disks/ObjectStorages/DiskObjectStorageMetadataHelper.h b/src/Disks/ObjectStorages/DiskObjectStorageMetadataHelper.h index 89153e4a39c..58ef8405a13 100644 --- a/src/Disks/ObjectStorages/DiskObjectStorageMetadataHelper.h +++ b/src/Disks/ObjectStorages/DiskObjectStorageMetadataHelper.h @@ -1,6 +1,7 @@ #pragma once #include +#include namespace DB { @@ -25,9 +26,37 @@ public: DiskObjectStorageMetadataHelper(DiskObjectStorage * disk_, ReadSettings read_settings_) : disk(disk_) , read_settings(std::move(read_settings_)) + , operation_log_suffix("-" + getFQDNOrHostName()) { } + /// Most important method, called on DiskObjectStorage startup + void restore(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context); + + void syncRevision(UInt64 revision) + { + UInt64 local_revision = revision_counter.load(); + while ((revision > local_revision) && revision_counter.compare_exchange_weak(local_revision, revision)); + } + + UInt64 getRevision() const + { + return revision_counter.load(); + } + + static int readSchemaVersion(IObjectStorage * object_storage, const String & source_path); + + void migrateToRestorableSchema(); + + void findLastRevision(); + + void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const; + + /// Version with possibility to backup-restore metadata. + static constexpr int RESTORABLE_SCHEMA_VERSION = 1; + + std::atomic revision_counter = 0; +private: struct RestoreInformation { UInt64 revision = LATEST_REVISION; @@ -38,32 +67,24 @@ public: using Futures = std::vector>; - void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectAttributes & metadata) const; + /// Move file or files in directory when possible and remove files in other case + /// to restore by S3 operation log with same operations from different replicas + void moveRecursiveOrRemove(const String & from_path, const String & to_path, bool send_metadata); - void findLastRevision(); - - static int readSchemaVersion(IObjectStorage * object_storage, const String & source_path); void saveSchemaVersion(const int & version) const; void updateObjectMetadata(const String & key, const ObjectAttributes & metadata) const; void migrateFileToRestorableSchema(const String & path) const; void migrateToRestorableSchemaRecursive(const String & path, Futures & results); - void migrateToRestorableSchema(); - - /// Most important method, called on DiskObjectStorage startup - void restore(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context); void readRestoreInformation(RestoreInformation & restore_information); void restoreFiles(IObjectStorage * source_object_storage, const RestoreInformation & restore_information); void processRestoreFiles(IObjectStorage * source_object_storage, const String & source_path, const std::vector & keys) const; void restoreFileOperations(IObjectStorage * source_object_storage, const RestoreInformation & restore_information); - std::atomic revision_counter = 0; inline static const String RESTORE_FILE_NAME = "restore"; /// Object contains information about schema version. inline static const String SCHEMA_VERSION_OBJECT = ".SCHEMA_VERSION"; - /// Version with possibility to backup-restore metadata. - static constexpr int RESTORABLE_SCHEMA_VERSION = 1; /// Directories with data. const std::vector data_roots {"data", "store"}; @@ -72,6 +93,8 @@ public: ObjectStoragePtr object_storage_from_another_namespace; ReadSettings read_settings; + + String operation_log_suffix; }; } diff --git a/src/Disks/ObjectStorages/S3/diskSettings.cpp b/src/Disks/ObjectStorages/S3/diskSettings.cpp index 145bb4a3d66..79a7978c53e 100644 --- a/src/Disks/ObjectStorages/S3/diskSettings.cpp +++ b/src/Disks/ObjectStorages/S3/diskSettings.cpp @@ -14,9 +14,9 @@ #include #include #include -#include -#include -#include +#include +#include +#include #include #include #include @@ -149,4 +149,5 @@ std::unique_ptr getClient(const Poco::Util::AbstractConfigura } } ->>>>>> master:src/Disks/S3/registerDiskS3.cpp + +#endif diff --git a/src/Disks/ObjectStorages/S3/registerDiskS3.cpp b/src/Disks/ObjectStorages/S3/registerDiskS3.cpp index 9c9c76ad451..d7e82ef3392 100644 --- a/src/Disks/ObjectStorages/S3/registerDiskS3.cpp +++ b/src/Disks/ObjectStorages/S3/registerDiskS3.cpp @@ -10,6 +10,7 @@ #include +#include #include #include diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 78896d74d09..0c834564ec4 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -518,7 +518,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart( if (!disk) disk = reservation->getDisk(); - UInt64 revision = parse(in.getResponseCookie("disk_revision", "0")); + UInt64 revision = parse(in->getResponseCookie("disk_revision", "0")); if (revision) disk->syncRevision(revision); diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 7960b7dfac0..1dbf7b36f1b 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -239,7 +239,11 @@ private: }; StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator( - const Aws::S3::S3Client & client_, const S3::URI & globbed_uri_, ASTPtr query, const Block & virtual_header, ContextPtr context) + const Aws::S3::S3Client & client_, + const S3::URI & globbed_uri_, + ASTPtr query, + const Block & virtual_header, + ContextPtr context) : pimpl(std::make_shared(client_, globbed_uri_, query, virtual_header, context)) { } diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index ef16982ba58..b246de18bfb 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -36,7 +36,12 @@ public: { public: DisclosedGlobIterator( - const Aws::S3::S3Client & client_, const S3::URI & globbed_uri_, ASTPtr query, const Block & virtual_header, ContextPtr context); + const Aws::S3::S3Client & client_, + const S3::URI & globbed_uri_, + ASTPtr query, + const Block & virtual_header, + ContextPtr context); + String next(); private: