diff --git a/src/Disks/DiskCacheWrapper.cpp b/src/Disks/DiskCacheWrapper.cpp index 7ce963380d4..89bab7cfa98 100644 --- a/src/Disks/DiskCacheWrapper.cpp +++ b/src/Disks/DiskCacheWrapper.cpp @@ -239,19 +239,6 @@ void DiskCacheWrapper::replaceFile(const String & from_path, const String & to_p DiskDecorator::replaceFile(from_path, to_path); } -void DiskCacheWrapper::copyFile(const String & from_path, const String & to_path) -{ - if (cache_disk->exists(from_path)) - { - auto dir_path = getDirectoryPath(to_path); - if (!cache_disk->exists(dir_path)) - cache_disk->createDirectories(dir_path); - - cache_disk->copyFile(from_path, to_path); - } - DiskDecorator::copyFile(from_path, to_path); -} - void DiskCacheWrapper::remove(const String & path) { if (cache_disk->exists(path)) diff --git a/src/Disks/DiskCacheWrapper.h b/src/Disks/DiskCacheWrapper.h index b0b373d900c..711ad5280ec 100644 --- a/src/Disks/DiskCacheWrapper.h +++ b/src/Disks/DiskCacheWrapper.h @@ -32,7 +32,6 @@ public: void moveDirectory(const String & from_path, const String & to_path) override; void moveFile(const String & from_path, const String & to_path) override; void replaceFile(const String & from_path, const String & to_path) override; - void copyFile(const String & from_path, const String & to_path) override; std::unique_ptr readFile(const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold) const override; std::unique_ptr diff --git a/src/Disks/DiskLocal.cpp b/src/Disks/DiskLocal.cpp index cde9b3c5a41..364b5bf4e2f 100644 --- a/src/Disks/DiskLocal.cpp +++ b/src/Disks/DiskLocal.cpp @@ -220,11 +220,6 @@ void DiskLocal::replaceFile(const String & from_path, const String & to_path) from_file.renameTo(to_file.path()); } -void DiskLocal::copyFile(const String & from_path, const String & to_path) -{ - Poco::File(disk_path + from_path).copyTo(disk_path + to_path); -} - std::unique_ptr DiskLocal::readFile(const String & path, size_t buf_size, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold) const { diff --git a/src/Disks/DiskLocal.h b/src/Disks/DiskLocal.h index 762a8502faa..eac95c543ef 100644 --- a/src/Disks/DiskLocal.h +++ b/src/Disks/DiskLocal.h @@ -67,8 +67,6 @@ public: void replaceFile(const String & from_path, const String & to_path) override; - void copyFile(const String & from_path, const String & to_path) override; - void copy(const String & from_path, const std::shared_ptr & to_disk, const String & to_path) override; void listFiles(const String & path, std::vector & file_names) override; diff --git a/src/Disks/DiskMemory.cpp b/src/Disks/DiskMemory.cpp index d185263d48c..ef68ad19191 100644 --- a/src/Disks/DiskMemory.cpp +++ b/src/Disks/DiskMemory.cpp @@ -314,11 +314,6 @@ void DiskMemory::replaceFileImpl(const String & from_path, const String & to_pat files.insert(std::move(node)); } -void DiskMemory::copyFile(const String & /*from_path*/, const String & /*to_path*/) -{ - throw Exception("Method copyFile is not implemented for memory disks", ErrorCodes::NOT_IMPLEMENTED); -} - std::unique_ptr DiskMemory::readFile(const String & path, size_t /*buf_size*/, size_t, size_t, size_t) const { std::lock_guard lock(mutex); diff --git a/src/Disks/DiskMemory.h b/src/Disks/DiskMemory.h index 4d4b947098b..5c81051eaa4 100644 --- a/src/Disks/DiskMemory.h +++ b/src/Disks/DiskMemory.h @@ -60,8 +60,6 @@ public: void replaceFile(const String & from_path, const String & to_path) override; - void copyFile(const String & from_path, const String & to_path) override; - void listFiles(const String & path, std::vector & file_names) override; std::unique_ptr readFile( diff --git a/src/Disks/IDisk.h b/src/Disks/IDisk.h index ac0f5a2ae8f..d20c1327509 100644 --- a/src/Disks/IDisk.h +++ b/src/Disks/IDisk.h @@ -127,9 +127,6 @@ public: /// If a file with `to_path` path already exists, it will be replaced. virtual void replaceFile(const String & from_path, const String & to_path) = 0; - /// Copy the file from `from_path` to `to_path`. - virtual void copyFile(const String & from_path, const String & to_path) = 0; - /// Recursively copy data containing at `from_path` to `to_path` located at `to_disk`. virtual void copy(const String & from_path, const std::shared_ptr & to_disk, const String & to_path); diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index 4786c05f8b0..d4b2f43b70a 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -23,6 +23,8 @@ #include #include #include +#include +#include #include @@ -32,6 +34,7 @@ namespace DB namespace ErrorCodes { + extern const int S3_ERROR; extern const int FILE_ALREADY_EXISTS; extern const int CANNOT_SEEK_THROUGH_FILE; extern const int UNKNOWN_FORMAT; @@ -76,12 +79,12 @@ String getRandomName() } template -void throwIfError(Aws::Utils::Outcome && response) +void throwIfError(Aws::Utils::Outcome & response) { if (!response.IsSuccess()) { const auto & err = response.GetError(); - throw Exception(err.GetMessage(), static_cast(err.GetErrorType())); + throw Exception(std::to_string(static_cast(err.GetErrorType())) + ": " + err.GetMessage(), ErrorCodes::S3_ERROR); } } @@ -613,45 +616,31 @@ void DiskS3::moveFile(const String & from_path, const String & to_path) { if (exists(to_path)) throw Exception("File already exists: " + to_path, ErrorCodes::FILE_ALREADY_EXISTS); + + if (send_metadata) + { + auto revision = ++revision_counter; + const DiskS3::ObjectMetadata object_metadata { + {"from_path", from_path}, + {"to_path", to_path} + }; + createFileOperationObject("rename", revision, object_metadata); + } + Poco::File(metadata_path + from_path).renameTo(metadata_path + to_path); } void DiskS3::replaceFile(const String & from_path, const String & to_path) { - Poco::File from_file(metadata_path + from_path); - Poco::File to_file(metadata_path + to_path); - if (to_file.exists()) + if (exists(to_path)) { - Poco::File tmp_file(metadata_path + to_path + ".old"); - to_file.renameTo(tmp_file.path()); - from_file.renameTo(metadata_path + to_path); - remove(to_path + ".old"); + const String tmp_path = to_path + ".old"; + moveFile(to_path, tmp_path); + moveFile(from_path, to_path); + remove(tmp_path); } else - from_file.renameTo(to_file.path()); -} - -void DiskS3::copyFile(const String & from_path, const String & to_path) -{ - if (exists(to_path)) - remove(to_path); - - auto from = readMeta(from_path); - auto to = createMeta(to_path); - - for (const auto & [path, size] : from.s3_objects) - { - auto new_path = getRandomName(); - Aws::S3::Model::CopyObjectRequest req; - req.SetCopySource(bucket + "/" + s3_root_path + path); - req.SetBucket(bucket); - req.SetKey(s3_root_path + new_path); - throwIfError(client->CopyObject(req)); - - to.addObject(new_path, size); - } - - to.save(); + moveFile(from_path, to_path); } std::unique_ptr DiskS3::readFile(const String & path, size_t buf_size, size_t, size_t, size_t) const @@ -673,7 +662,17 @@ std::unique_ptr DiskS3::writeFile(const String & path, /// Path to store new S3 object. auto s3_path = getRandomName(); - auto object_metadata = createObjectMetadata(path); + + std::optional object_metadata; + if (send_metadata) + { + auto revision = ++revision_counter; + object_metadata = { + {"path", path} + }; + s3_path = "r" + revisionToString(revision) + "-file-" + s3_path; + } + if (!exist || mode == WriteMode::Rewrite) { /// If metadata file exists - remove and create new. @@ -727,6 +726,15 @@ void DiskS3::removeMeta(const String & path, AwsS3KeyKeeper & keys) } else /// In other case decrement number of references, save metadata and delete file. { + if (send_metadata) + { + auto revision = ++revision_counter; + const ObjectMetadata object_metadata { + {"path", path} + }; + createFileOperationObject("remove", revision, object_metadata); + } + --metadata.ref_count; metadata.save(); file.remove(); @@ -780,7 +788,8 @@ void DiskS3::removeAws(const AwsS3KeyKeeper & keys) Aws::S3::Model::DeleteObjectsRequest request; request.SetBucket(bucket); request.SetDelete(delkeys); - throwIfError(client->DeleteObjects(request)); + auto outcome = client->DeleteObjects(request); + throwIfError(outcome); } } } @@ -840,6 +849,16 @@ Poco::Timestamp DiskS3::getLastModified(const String & path) void DiskS3::createHardLink(const String & src_path, const String & dst_path) { + if (send_metadata) + { + auto revision = ++revision_counter; + const ObjectMetadata object_metadata { + {"src_path", src_path}, + {"dst_path", dst_path} + }; + createFileOperationObject("hardlink", revision, object_metadata); + } + /// Increment number of references. auto src = readMeta(src_path); ++src.ref_count; @@ -889,12 +908,257 @@ void DiskS3::shutdown() client->DisableRequestProcessing(); } -std::optional DiskS3::createObjectMetadata(const String & path) const +void DiskS3::createFileOperationObject(const String & operation_name, UInt64 revision, const DiskS3::ObjectMetadata & metadata) { - if (send_metadata) - return (DiskS3::ObjectMetadata){{"path", path}}; + const String key = "meta/r" + revisionToString(revision) + "-" + operation_name; + WriteBufferFromS3 buffer(client, bucket, s3_root_path + key, min_upload_part_size, max_single_part_upload_size, metadata); + buffer.write('0'); + buffer.finalize(); +} - return {}; +void DiskS3::startup() +{ + if (!send_metadata) + return; + + LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting up disk {}", name); + + /// Find last revision. + UInt64 l = 0, r = (static_cast(1)) << 63; + while (r - l > 1) + { + auto revision = (r - l) >> 1; + auto revision_str = revisionToString(revision); + /// Check that object or metaobject with such revision exists. + if (checkObjectExists(s3_root_path + "r" + revision_str) + || checkObjectExists(s3_root_path + "meta/r" + revision_str)) + l = revision; + else + r = revision; + } + revision_counter = l; + LOG_INFO(&Poco::Logger::get("DiskS3"), "Found last revision number {}", revision_counter); +} + +bool DiskS3::checkObjectExists(const String & prefix) +{ + Aws::S3::Model::ListObjectsV2Request request; + request.SetBucket(bucket); + request.SetPrefix(prefix); + request.SetMaxKeys(1); + + auto outcome = client->ListObjectsV2(request); + throwIfError(outcome); + + return !outcome.GetResult().GetContents().empty(); +} + +struct DiskS3::RestoreInformation +{ + UInt64 revision = (static_cast(1)) << 63; + String bucket; + String path; +}; + +void DiskS3::restore() +{ + if (!exists(restore_file)) + return; + + RestoreInformation information; + ///TODO: read restore information from restore_file. + + restoreFiles(information.bucket, information.path, information.revision); + restoreFileOperations(information.bucket, information.path, information.revision); +} + +Aws::S3::Model::HeadObjectResult DiskS3::headObject(const String & source_bucket, const String & key) +{ + Aws::S3::Model::HeadObjectRequest request; + request.SetBucket(source_bucket); + request.SetKey(key); + + auto outcome = client->HeadObject(request); + throwIfError(outcome); + + return outcome.GetResultWithOwnership(); +} + +void DiskS3::listObjects(const String & source_bucket, const String & source_path, std::function callback) +{ + Aws::S3::Model::ListObjectsV2Request request; + request.SetBucket(source_bucket); + request.SetPrefix(source_path); + request.SetMaxKeys(1000); + + Aws::S3::Model::ListObjectsV2Outcome outcome; + do + { + outcome = client->ListObjectsV2(request); + throwIfError(outcome); + + bool should_continue = callback(outcome.GetResult()); + + if (!should_continue) + break; + + request.SetContinuationToken(outcome.GetResult().GetNextContinuationToken()); + } while (outcome.GetResult().GetIsTruncated()); +} + +void DiskS3::restoreFiles(const String & source_bucket, const String & source_path, UInt64 revision) +{ + LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting restore files for disk {}", name); + + std::vector> results; + + listObjects(source_bucket, source_path, [this, &source_bucket, &revision, &results](auto list_result) { + std::vector keys; + for (const auto & row : list_result.GetContents()) + { + const String & key = row.GetKey(); + + /// Skip meta objects. They will be processed separately. + if (key.find("/meta/") != String::npos) + continue; + + /// Filter early if it's possible to get revision from key. + if (extractRevisionFromKey(key) > revision) + continue; + + keys.push_back(key); + } + + if (!keys.empty()) + { + auto result = getExecutor().execute([this, &source_bucket, keys]() { processRestoreFiles(source_bucket, keys); + }); + + results.push_back(std::move(result)); + } + + return true; + }); + + for (auto & result : results) + result.wait(); + for (auto & result : results) + result.get(); + + LOG_INFO(&Poco::Logger::get("DiskS3"), "Files are restored for disk {}", name); +} + +inline String getDirectoryPath(const String & path) +{ + return Poco::Path{path}.setFileName("").toString(); +} + +void DiskS3::processRestoreFiles(const String & source_bucket, Strings keys) +{ + for (const auto & key : keys) + { + Aws::S3::Model::HeadObjectRequest request; + request.SetBucket(source_bucket); + request.SetKey(key); + + auto outcome = client->HeadObject(request); + throwIfError(outcome); + + auto object_metadata = outcome.GetResult().GetMetadata(); + + /// If object has 'path' in metadata then restore it. + auto path = object_metadata.find("path"); + if (path == object_metadata.end()) + continue; + + createDirectories(getDirectoryPath(path->second)); + auto metadata = createMeta(path->second); + + /// TODO: shrink common prefix of s3_root_path and key. + auto relative_key = key; + metadata.addObject(relative_key, outcome.GetResult().GetContentLength()); + + /// TODO: Copy object to configured bucket if source_bucket is different. + + metadata.save(); + } +} + +void DiskS3::restoreFileOperations(const String & source_bucket, const String & source_path, UInt64 revision) +{ + LOG_INFO(&Poco::Logger::get("DiskS3"), "Starting restore file operations for disk {}", name); + + /// Temporarily disable sending metadata. + send_metadata = false; + + listObjects(source_bucket, source_path + "meta/", [this, &source_bucket, &revision](auto list_result) { + const String rename = "rename"; + const String remove = "remove"; + const String hardlink = "hardlink"; + + for (const auto & row : list_result.GetContents()) + { + const String & key = row.GetKey(); + + /// Stop processing when get revision more than required. + /// S3 ensures that keys will be listed in ascending UTF-8 bytes order. + if (extractRevisionFromKey(key) > revision) + return false; + + auto operation = extractOperationFromKey(key); + auto object_metadata = headObject(source_bucket, key).GetMetadata(); + if (operation == rename) + { + auto from_path = object_metadata["from_path"]; + auto to_path = object_metadata["to_path"]; + if (exists(from_path)) + moveFile(from_path, to_path); + } + else if (operation == remove) + { + removeIfExists(object_metadata["path"]); + } + else if (operation == hardlink) + { + auto src_path = object_metadata["src_path"]; + auto dst_path = object_metadata["dst_path"]; + /// Skip hardlinks to shadow (backup) directory. + if (exists(src_path) && dst_path.find("/shadow/") != String::npos) + createHardLink(src_path, dst_path); + } + } + + return true; + }); + + send_metadata = true; + + LOG_INFO(&Poco::Logger::get("DiskS3"), "File operations restored for disk {}", name); +} + +UInt64 DiskS3::extractRevisionFromKey(const String & key) +{ + /// TODO: Implement. + return 0; +} + +String DiskS3::extractOperationFromKey(const String & key) +{ + /// TODO: Implement. + return ""; +} + +String DiskS3::revisionToString(UInt64 revision) +{ + static constexpr size_t max_digits = 19; + + /// Align revision number with leading zeroes to have strict lexicographical order of them. + auto revision_str = std::to_string(revision); + auto digits_to_align = max_digits - revision_str.length(); + for (size_t i = 0; i < digits_to_align; ++i) + revision_str = "0" + revision_str; + + return revision_str; } } diff --git a/src/Disks/S3/DiskS3.h b/src/Disks/S3/DiskS3.h index f62c603adda..dfaa3136642 100644 --- a/src/Disks/S3/DiskS3.h +++ b/src/Disks/S3/DiskS3.h @@ -1,10 +1,14 @@ #pragma once +#include #include "Disks/DiskFactory.h" #include "Disks/Executor.h" #include "ProxyConfiguration.h" #include +#include +#include + #include @@ -19,12 +23,16 @@ namespace DB class DiskS3 : public IDisk { public: + /// File contains restore information + const String restore_file = "restore"; + using ObjectMetadata = std::map; friend class DiskS3Reservation; class AwsS3KeyKeeper; struct Metadata; + struct RestoreInformation; DiskS3( String name_, @@ -74,8 +82,6 @@ public: void replaceFile(const String & from_path, const String & to_path) override; - void copyFile(const String & from_path, const String & to_path) override; - void listFiles(const String & path, std::vector & file_names) override; std::unique_ptr readFile( @@ -114,17 +120,34 @@ public: void shutdown() override; + /// Actions performed after disk creation. + void startup(); + + /// Restore S3 metadata files on file system. + void restore(); + private: bool tryReserve(UInt64 bytes); void removeMeta(const String & path, AwsS3KeyKeeper & keys); void removeMetaRecursive(const String & path, AwsS3KeyKeeper & keys); void removeAws(const AwsS3KeyKeeper & keys); - std::optional createObjectMetadata(const String & path) const; Metadata readMeta(const String & path) const; Metadata createMeta(const String & path) const; + void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata); + String revisionToString(UInt64 revision); + bool checkObjectExists(const String & prefix); + + Aws::S3::Model::HeadObjectResult headObject(const String & source_bucket, const String & key); + void listObjects(const String & source_bucket, const String & source_path, std::function callback); + void restoreFiles(const String & source_bucket, const String & source_path, UInt64 revision); + void processRestoreFiles(const String & source_bucket, std::vector keys); + void restoreFileOperations(const String & source_bucket, const String & source_path, UInt64 revision); + UInt64 extractRevisionFromKey(const String & key); + String extractOperationFromKey(const String & key); + private: const String name; std::shared_ptr client; @@ -140,6 +163,8 @@ private: UInt64 reserved_bytes = 0; UInt64 reservation_count = 0; std::mutex reservation_mutex; + + std::atomic revision_counter; }; }