From 8128ca10f436871c674c51b0a0ae74e9c88f0485 Mon Sep 17 00:00:00 2001 From: Vladimir Chebotarev Date: Tue, 23 Jun 2020 19:00:15 +0300 Subject: [PATCH] Switched paths in S3 metadata to relative. --- src/Disks/S3/DiskS3.cpp | 66 +++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 25 deletions(-) diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index 71b5991f770..873b54353ad 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -20,6 +20,9 @@ #include #include +#include + + namespace DB { @@ -60,10 +63,14 @@ namespace struct Metadata { /// Metadata file version. - static constexpr UInt32 VERSION = 1; + static constexpr UInt32 VERSION_ABSOLUTE_PATHS = 1; + static constexpr UInt32 VERSION_RELATIVE_PATHS = 2; using PathAndSize = std::pair; + /// S3 root path. + const String & s3_root_path; + /// Disk path. const String & disk_path; /// Relative path to metadata file on local FS. @@ -76,8 +83,8 @@ namespace UInt32 ref_count; /// Load metadata by path or create empty if `create` flag is set. - explicit Metadata(const String & disk_path_, const String & metadata_file_path_, bool create = false) - : disk_path(disk_path_), metadata_file_path(metadata_file_path_), total_size(0), s3_objects(0), ref_count(0) + explicit Metadata(const String & s3_root_path_, const String & disk_path_, const String & metadata_file_path_, bool create = false) + : s3_root_path(s3_root_path_), disk_path(disk_path_), metadata_file_path(metadata_file_path_), total_size(0), s3_objects(0), ref_count(0) { if (create) return; @@ -87,10 +94,10 @@ namespace UInt32 version; readIntText(version, buf); - if (version != VERSION) + if (version != VERSION_RELATIVE_PATHS && version != VERSION_ABSOLUTE_PATHS) throw Exception( "Unknown metadata file version. Path: " + disk_path + metadata_file_path - + " Version: " + std::to_string(version) + ", Expected version: " + std::to_string(VERSION), + + " Version: " + std::to_string(version) + ", Maximum expected version: " + std::to_string(VERSION_RELATIVE_PATHS), ErrorCodes::UNKNOWN_FORMAT); assertChar('\n', buf); @@ -108,6 +115,15 @@ namespace readIntText(s3_object_size, buf); assertChar('\t', buf); readEscapedString(s3_object_path, buf); + if (version == VERSION_ABSOLUTE_PATHS) + { + if (!boost::algorithm::starts_with(s3_object_path, s3_root_path)) + throw Exception( + "Path in metadata does not correspond S3 root path. Path: " + s3_object_path + + ", root path: " + s3_root_path + ", disk path: " + disk_path_, + ErrorCodes::UNKNOWN_FORMAT); + s3_object_path = s3_object_path.substr(s3_root_path.size()); + } assertChar('\n', buf); s3_objects[i] = {s3_object_path, s3_object_size}; } @@ -127,7 +143,7 @@ namespace { WriteBufferFromFile buf(disk_path + metadata_file_path, 1024); - writeIntText(VERSION, buf); + writeIntText(VERSION_RELATIVE_PATHS, buf); writeChar('\n', buf); writeIntText(s3_objects.size(), buf); @@ -213,7 +229,7 @@ namespace const auto & [path, size] = metadata.s3_objects[i]; if (size > offset) { - auto buf = std::make_unique(client_ptr, bucket, path, buf_size); + auto buf = std::make_unique(client_ptr, bucket, metadata.s3_root_path + path, buf_size); buf->seek(offset, SEEK_SET); return buf; } @@ -242,7 +258,7 @@ namespace ++current_buf_idx; const auto & path = metadata.s3_objects[current_buf_idx].first; - current_buf = std::make_unique(client_ptr, bucket, path, buf_size); + current_buf = std::make_unique(client_ptr, bucket, metadata.s3_root_path + path, buf_size); current_buf->next(); working_buffer = current_buf->buffer(); absolute_position += working_buffer.size(); @@ -272,7 +288,7 @@ namespace size_t min_upload_part_size, size_t buf_size_) : WriteBufferFromFileBase(buf_size_, nullptr, 0) - , impl(WriteBufferFromS3(client_ptr_, bucket_, s3_path_, min_upload_part_size, buf_size_)) + , impl(WriteBufferFromS3(client_ptr_, bucket_, metadata_.s3_root_path + s3_path_, min_upload_part_size, buf_size_)) , metadata(std::move(metadata_)) , s3_path(s3_path_) { @@ -440,7 +456,7 @@ bool DiskS3::isDirectory(const String & path) const size_t DiskS3::getFileSize(const String & path) const { - Metadata metadata(metadata_path, path); + Metadata metadata(s3_root_path, metadata_path, path); return metadata.total_size; } @@ -493,16 +509,16 @@ void DiskS3::copyFile(const String & from_path, const String & to_path) if (exists(to_path)) remove(to_path); - Metadata from(metadata_path, from_path); - Metadata to(metadata_path, to_path, true); + Metadata from(s3_root_path, metadata_path, from_path); + Metadata to(s3_root_path, metadata_path, to_path, true); for (const auto & [path, size] : from.s3_objects) { - auto new_path = s3_root_path + getRandomName(); + auto new_path = getRandomName(); Aws::S3::Model::CopyObjectRequest req; - req.SetCopySource(bucket + "/" + path); + req.SetCopySource(bucket + "/" + s3_root_path + path); req.SetBucket(bucket); - req.SetKey(new_path); + req.SetKey(s3_root_path + new_path); throwIfError(client->CopyObject(req)); to.addObject(new_path, size); @@ -513,7 +529,7 @@ void DiskS3::copyFile(const String & from_path, const String & to_path) std::unique_ptr DiskS3::readFile(const String & path, size_t buf_size, size_t, size_t, size_t) const { - Metadata metadata(metadata_path, path); + Metadata metadata(s3_root_path, metadata_path, path); LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Read from file by path: {}. Existing S3 objects: {}", backQuote(metadata_path + path), metadata.s3_objects.size()); @@ -525,27 +541,27 @@ std::unique_ptr DiskS3::writeFile(const String & path, { bool exist = exists(path); /// Path to store new S3 object. - auto s3_path = s3_root_path + getRandomName(); + auto s3_path = getRandomName(); if (!exist || mode == WriteMode::Rewrite) { /// If metadata file exists - remove and create new. if (exist) remove(path); - Metadata metadata(metadata_path, path, true); + Metadata metadata(s3_root_path, metadata_path, path, true); /// Save empty metadata to disk to have ability to get file size while buffer is not finalized. metadata.save(); - LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write to file by path: {} New S3 path: {}", backQuote(metadata_path + path), s3_path); + LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write to file by path: {} New S3 path: {}", backQuote(metadata_path + path), s3_root_path + s3_path); return std::make_unique(client, bucket, metadata, s3_path, min_upload_part_size, buf_size); } else { - Metadata metadata(metadata_path, path); + Metadata metadata(s3_root_path, metadata_path, path); LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.", - backQuote(metadata_path + path), s3_path, metadata.s3_objects.size()); + backQuote(metadata_path + path), s3_root_path + s3_path, metadata.s3_objects.size()); return std::make_unique(client, bucket, metadata, s3_path, min_upload_part_size, buf_size); } @@ -558,7 +574,7 @@ void DiskS3::remove(const String & path) Poco::File file(metadata_path + path); if (file.isFile()) { - Metadata metadata(metadata_path, path); + Metadata metadata(s3_root_path, metadata_path, path); /// If there is no references - delete content from S3. if (metadata.ref_count == 0) @@ -569,7 +585,7 @@ void DiskS3::remove(const String & path) /// TODO: Make operation idempotent. Do not throw exception if key is already deleted. Aws::S3::Model::DeleteObjectRequest request; request.SetBucket(bucket); - request.SetKey(s3_object_path); + request.SetKey(s3_root_path + s3_object_path); throwIfError(client->DeleteObject(request)); } } @@ -644,7 +660,7 @@ Poco::Timestamp DiskS3::getLastModified(const String & path) void DiskS3::createHardLink(const String & src_path, const String & dst_path) { /// Increment number of references. - Metadata src(metadata_path, src_path); + Metadata src(s3_root_path, metadata_path, src_path); ++src.ref_count; src.save(); @@ -655,7 +671,7 @@ void DiskS3::createHardLink(const String & src_path, const String & dst_path) void DiskS3::createFile(const String & path) { /// Create empty metadata file. - Metadata metadata(metadata_path, path, true); + Metadata metadata(s3_root_path, metadata_path, path, true); metadata.save(); }