diff --git a/src/Backups/BackupIO_S3.cpp b/src/Backups/BackupIO_S3.cpp index 4529f34e2a7..aa667fde06f 100644 --- a/src/Backups/BackupIO_S3.cpp +++ b/src/Backups/BackupIO_S3.cpp @@ -156,10 +156,9 @@ void BackupWriterS3::copyObjectImpl( const String & src_key, const String & dst_bucket, const String & dst_key, - const Aws::S3::Model::HeadObjectResult & head, + size_t size, const std::optional & metadata) const { - size_t size = head.GetContentLength(); LOG_TRACE(log, "Copying {} bytes using single-operation copy", size); Aws::S3::Model::CopyObjectRequest request; @@ -177,7 +176,7 @@ void BackupWriterS3::copyObjectImpl( if (!outcome.IsSuccess() && (outcome.GetError().GetExceptionName() == "EntityTooLarge" || outcome.GetError().GetExceptionName() == "InvalidRequest")) { // Can't come here with MinIO, MinIO allows single part upload for large objects. - copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, head, metadata); + copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, size, metadata); return; } @@ -191,10 +190,9 @@ void BackupWriterS3::copyObjectMultipartImpl( const String & src_key, const String & dst_bucket, const String & dst_key, - const Aws::S3::Model::HeadObjectResult & head, + size_t size, const std::optional & metadata) const { - size_t size = head.GetContentLength(); LOG_TRACE(log, "Copying {} bytes using multipart upload copy", size); String multipart_upload_id; @@ -309,16 +307,16 @@ void BackupWriterS3::copyFileNative(DiskPtr from_disk, const String & file_name_ std::string source_bucket = object_storage->getObjectsNamespace(); auto file_path = fs::path(s3_uri.key) / file_name_to; - auto head = S3::headObject(*client, source_bucket, objects[0].absolute_path).GetResult(); - if (static_cast(head.GetContentLength()) < request_settings.getUploadSettings().max_single_operation_copy_size) + auto size = S3::getObjectSize(*client, source_bucket, objects[0].absolute_path); + if (size < request_settings.getUploadSettings().max_single_operation_copy_size) { copyObjectImpl( - source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head); + source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, size); } else { copyObjectMultipartImpl( - source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head); + source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, size); } } } diff --git a/src/Backups/BackupIO_S3.h b/src/Backups/BackupIO_S3.h index 634b35c1e74..70487717c48 100644 --- a/src/Backups/BackupIO_S3.h +++ b/src/Backups/BackupIO_S3.h @@ -67,7 +67,7 @@ private: const String & src_key, const String & dst_bucket, const String & dst_key, - const Aws::S3::Model::HeadObjectResult & head, + size_t size, const std::optional & metadata = std::nullopt) const; void copyObjectMultipartImpl( @@ -75,7 +75,7 @@ private: const String & src_key, const String & dst_bucket, const String & dst_key, - const Aws::S3::Model::HeadObjectResult & head, + size_t size, const std::optional & metadata = std::nullopt) const; void removeFilesBatch(const Strings & file_names); diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index abc28299f96..e6b71ddb82a 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -309,6 +309,8 @@ The server successfully detected this situation and will download merged part fr M(S3CopyObject, "Number of S3 API CopyObject calls.") \ M(S3ListObjects, "Number of S3 API ListObjects calls.") \ M(S3HeadObject, "Number of S3 API HeadObject calls.") \ + M(S3GetObjectAttributes, "Number of S3 API GetObjectAttributes calls.") \ + M(S3GetObjectMetadata, "Number of S3 API GetObject calls for getting metadata.") \ M(S3CreateMultipartUpload, "Number of S3 API CreateMultipartUpload calls.") \ M(S3UploadPartCopy, "Number of S3 API UploadPartCopy calls.") \ M(S3UploadPart, "Number of S3 API UploadPart calls.") \ @@ -321,6 +323,8 @@ The server successfully detected this situation and will download merged part fr M(DiskS3CopyObject, "Number of DiskS3 API CopyObject calls.") \ M(DiskS3ListObjects, "Number of DiskS3 API ListObjects calls.") \ M(DiskS3HeadObject, "Number of DiskS3 API HeadObject calls.") \ + M(DiskS3GetObjectAttributes, "Number of DiskS3 API GetObjectAttributes calls.") \ + M(DiskS3GetObjectMetadata, "Number of DiskS3 API GetObject calls for getting metadata.") \ M(DiskS3CreateMultipartUpload, "Number of DiskS3 API CreateMultipartUpload calls.") \ M(DiskS3UploadPartCopy, "Number of DiskS3 API UploadPartCopy calls.") \ M(DiskS3UploadPart, "Number of DiskS3 API UploadPart calls.") \ diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp index 3c620ca819e..1868d558b5c 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.cpp @@ -125,9 +125,9 @@ std::string S3ObjectStorage::generateBlobNameForPath(const std::string & /* path getRandomASCIIString(key_name_total_size - key_name_prefix_size)); } -Aws::S3::Model::HeadObjectOutcome S3ObjectStorage::requestObjectHeadData(const std::string & bucket_from, const std::string & key) const +size_t S3ObjectStorage::getObjectSize(const std::string & bucket_from, const std::string & key) const { - return S3::headObject(*client.get(), bucket_from, key, "", true); + return S3::getObjectSize(*client.get(), bucket_from, key, "", true); } bool S3ObjectStorage::exists(const StoredObject & object) const @@ -135,6 +135,11 @@ bool S3ObjectStorage::exists(const StoredObject & object) const return S3::objectExists(*client.get(), bucket, object.absolute_path, "", true); } +std::pair S3ObjectStorage::checkObjectExists(const std::string & bucket_from, const std::string & key) const +{ + return S3::checkObjectExists(*client.get(), bucket_from, key, "", true); +} + std::unique_ptr S3ObjectStorage::readObjects( /// NOLINT const StoredObjects & objects, const ReadSettings & read_settings, @@ -409,13 +414,10 @@ ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) cons { ObjectMetadata result; - auto object_head = requestObjectHeadData(bucket, path); - throwIfError(object_head); - - auto & object_head_result = object_head.GetResult(); - result.size_bytes = object_head_result.GetContentLength(); - result.last_modified = object_head_result.GetLastModified().Millis(); - result.attributes = object_head_result.GetMetadata(); + auto object_info = S3::getObjectInfo(*client.get(), bucket, path, "", true, true); + result.size_bytes = object_info.size; + result.last_modified = object_info.last_modification_time; + result.attributes = S3::getObjectMetadata(*client.get(), bucket, path, "", true, true); return result; } @@ -442,7 +444,7 @@ void S3ObjectStorage::copyObjectImpl( const String & src_key, const String & dst_bucket, const String & dst_key, - std::optional head, + size_t size, std::optional metadata) const { auto client_ptr = client.get(); @@ -464,7 +466,7 @@ void S3ObjectStorage::copyObjectImpl( if (!outcome.IsSuccess() && (outcome.GetError().GetExceptionName() == "EntityTooLarge" || outcome.GetError().GetExceptionName() == "InvalidRequest")) { // Can't come here with MinIO, MinIO allows single part upload for large objects. - copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, head, metadata); + copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, size, metadata); return; } @@ -473,8 +475,8 @@ void S3ObjectStorage::copyObjectImpl( auto settings_ptr = s3_settings.get(); if (settings_ptr->request_settings.check_objects_after_upload) { - auto object_head = requestObjectHeadData(dst_bucket, dst_key); - if (!object_head.IsSuccess()) + auto [exists, error] = checkObjectExists(dst_bucket, dst_key); + if (!exists) throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket); } @@ -485,15 +487,11 @@ void S3ObjectStorage::copyObjectMultipartImpl( const String & src_key, const String & dst_bucket, const String & dst_key, - std::optional head, + size_t size, std::optional metadata) const { - if (!head) - head = requestObjectHeadData(src_bucket, src_key).GetResult(); - auto settings_ptr = s3_settings.get(); auto client_ptr = client.get(); - size_t size = head->GetContentLength(); String multipart_upload_id; @@ -570,8 +568,8 @@ void S3ObjectStorage::copyObjectMultipartImpl( if (settings_ptr->request_settings.check_objects_after_upload) { - auto object_head = requestObjectHeadData(dst_bucket, dst_key); - if (!object_head.IsSuccess()) + auto [exists, error] = checkObjectExists(dst_bucket, dst_key); + if (!exists) throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket); } @@ -580,18 +578,18 @@ void S3ObjectStorage::copyObjectMultipartImpl( void S3ObjectStorage::copyObject( // NOLINT const StoredObject & object_from, const StoredObject & object_to, std::optional object_to_attributes) { - auto head = requestObjectHeadData(bucket, object_from.absolute_path).GetResult(); + auto size = getObjectSize(bucket, object_from.absolute_path); static constexpr int64_t multipart_upload_threashold = 5UL * 1024 * 1024 * 1024; - if (head.GetContentLength() >= multipart_upload_threashold) + if (size >= multipart_upload_threashold) { copyObjectMultipartImpl( - bucket, object_from.absolute_path, bucket, object_to.absolute_path, head, object_to_attributes); + bucket, object_from.absolute_path, bucket, object_to.absolute_path, size, object_to_attributes); } else { copyObjectImpl( - bucket, object_from.absolute_path, bucket, object_to.absolute_path, head, object_to_attributes); + bucket, object_from.absolute_path, bucket, object_to.absolute_path, size, object_to_attributes); } } diff --git a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h index 0a07639e253..bfcfd746e07 100644 --- a/src/Disks/ObjectStorages/S3/S3ObjectStorage.h +++ b/src/Disks/ObjectStorages/S3/S3ObjectStorage.h @@ -172,7 +172,7 @@ private: const String & src_key, const String & dst_bucket, const String & dst_key, - std::optional head = std::nullopt, + size_t size, std::optional metadata = std::nullopt) const; void copyObjectMultipartImpl( @@ -180,13 +180,14 @@ private: const String & src_key, const String & dst_bucket, const String & dst_key, - std::optional head = std::nullopt, + size_t size, std::optional metadata = std::nullopt) const; void removeObjectImpl(const StoredObject & object, bool if_exists); void removeObjectsImpl(const StoredObjects & objects, bool if_exists); - Aws::S3::Model::HeadObjectOutcome requestObjectHeadData(const std::string & bucket_from, const std::string & key) const; + size_t getObjectSize(const std::string & bucket_from, const std::string & key) const; + std::pair checkObjectExists(const std::string & bucket_from, const std::string & key) const; std::string bucket; diff --git a/src/IO/S3Common.cpp b/src/IO/S3Common.cpp index 0c3a63b46ea..b6937e5c40d 100644 --- a/src/IO/S3Common.cpp +++ b/src/IO/S3Common.cpp @@ -27,7 +27,8 @@ # include # include # include -# include +# include +# include # include # include @@ -40,8 +41,10 @@ namespace ProfileEvents { - extern const Event S3HeadObject; - extern const Event DiskS3HeadObject; + extern const Event S3GetObjectAttributes; + extern const Event DiskS3GetObjectAttributes; + extern const Event S3GetObjectMetadata; + extern const Event DiskS3GetObjectMetadata; } namespace DB @@ -699,6 +702,28 @@ public: } }; +/// Performs a GetObjectAttributes request. +Aws::S3::Model::GetObjectAttributesOutcome getObjectAttributes( + const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3) +{ + ProfileEvents::increment(ProfileEvents::S3GetObjectAttributes); + if (for_disk_s3) + ProfileEvents::increment(ProfileEvents::DiskS3GetObjectAttributes); + + /// We must not use the `HeadObject` request, see the comment about `HeadObjectRequest` in S3Common.h. + + Aws::S3::Model::GetObjectAttributesRequest req; + req.SetBucket(bucket); + req.SetKey(key); + + if (!version_id.empty()) + req.SetVersionId(version_id); + + req.SetObjectAttributes({Aws::S3::Model::ObjectAttributes::ObjectSize}); + + return client.GetObjectAttributes(req); +} + } @@ -894,36 +919,19 @@ namespace S3 return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY; } - Aws::S3::Model::HeadObjectOutcome headObject(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3) - { - ProfileEvents::increment(ProfileEvents::S3HeadObject); - if (for_disk_s3) - ProfileEvents::increment(ProfileEvents::DiskS3HeadObject); - - Aws::S3::Model::HeadObjectRequest req; - req.SetBucket(bucket); - req.SetKey(key); - - if (!version_id.empty()) - req.SetVersionId(version_id); - - return client.HeadObject(req); - } - S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3) { - auto outcome = headObject(client, bucket, key, version_id, for_disk_s3); - + auto outcome = getObjectAttributes(client, bucket, key, version_id, for_disk_s3); if (outcome.IsSuccess()) { - auto read_result = outcome.GetResultWithOwnership(); - return {.size = static_cast(read_result.GetContentLength()), .last_modification_time = read_result.GetLastModified().Millis() / 1000}; + const auto & result = outcome.GetResult(); + return {.size = static_cast(result.GetObjectSize()), .last_modification_time = result.GetLastModified().Millis() / 1000}; } else if (throw_on_error) { const auto & error = outcome.GetError(); throw DB::Exception(ErrorCodes::S3_ERROR, - "Failed to HEAD object: {}. HTTP response code: {}", + "Failed to get object attributes: {}. HTTP response code: {}", error.GetMessage(), static_cast(error.GetResponseCode())); } return {}; @@ -936,12 +944,11 @@ namespace S3 bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3) { - auto outcome = headObject(client, bucket, key, version_id, for_disk_s3); + auto [exists, error] = checkObjectExists(client, bucket, key, version_id, for_disk_s3); - if (outcome.IsSuccess()) + if (exists) return true; - const auto & error = outcome.GetError(); if (isNotFoundError(error.GetErrorType())) return false; @@ -949,6 +956,47 @@ namespace S3 "Failed to check existence of key {} in bucket {}: {}", key, bucket, error.GetMessage()); } + + std::pair checkObjectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3) + { + auto outcome = getObjectAttributes(client, bucket, key, version_id, for_disk_s3); + if (outcome.IsSuccess()) + return {true, {}}; + return {false, std::move(outcome.GetError())}; + } + + std::map getObjectMetadata(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3) + { + ProfileEvents::increment(ProfileEvents::S3GetObjectMetadata); + if (for_disk_s3) + ProfileEvents::increment(ProfileEvents::DiskS3GetObjectMetadata); + + /// We must not use the `HeadObject` request, see the comment about `HeadObjectRequest` in S3Common.h. + + Aws::S3::Model::GetObjectRequest req; + req.SetBucket(bucket); + req.SetKey(key); + + /// Only the first byte will be read. + /// We don't need that first byte but the range should be set otherwise the entire object will be read. + req.SetRange("bytes=0-0"); + + if (!version_id.empty()) + req.SetVersionId(version_id); + + auto outcome = client.GetObject(req); + + if (outcome.IsSuccess()) + return outcome.GetResult().GetMetadata(); + + if (!throw_on_error) + return {}; + + const auto & error = outcome.GetError(); + throw S3Exception(error.GetErrorType(), + "Failed to get metadata of key {} in bucket {}: {}", + key, bucket, error.GetMessage()); + } } } diff --git a/src/IO/S3Common.h b/src/IO/S3Common.h index f0844c05abc..081fbab777e 100644 --- a/src/IO/S3Common.h +++ b/src/IO/S3Common.h @@ -11,15 +11,15 @@ #if USE_AWS_S3 #include -#include -#include -#include -#include -#include - #include #include +#include +#include +#include + + +namespace Aws::S3 { class S3Client; } namespace DB { @@ -121,22 +121,36 @@ struct URI static void validateBucket(const String & bucket, const Poco::URI & uri); }; +/// WARNING: Don't use `HeadObjectRequest`! Use the functions below instead. +/// Explanation: The `HeadObject` request never returns a response body (even if there is an error) however +/// if the request was sent without specifying a region in the endpoint (i.e. for example "https://test.s3.amazonaws.com/mydata.csv" +/// instead of "https://test.s3-us-west-2.amazonaws.com/mydata.csv") then that response body is one of the main ways +/// to determine the correct region and try to repeat the request again with the correct region. +/// For any other request type (`GetObject`, `ListObjects`, etc.) AWS SDK does that because they have response bodies, +/// but for `HeadObject` there is no response body so this way doesn't work. +/// That's why it's better to avoid using `HeadObject` requests at all. +/// See https://github.com/aws/aws-sdk-cpp/issues/1558 and also the function S3ErrorMarshaller::ExtractRegion() for more details. + struct ObjectInfo { size_t size = 0; time_t last_modification_time = 0; }; -bool isNotFoundError(Aws::S3::S3Errors error); +S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool throw_on_error = true, bool for_disk_s3 = false); -Aws::S3::Model::HeadObjectOutcome headObject(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false); - -S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3); - -size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3); +size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool throw_on_error = true, bool for_disk_s3 = false); bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false); +/// Checks if the object exists. If it doesn't exists the function returns an error without throwing any exception. +std::pair checkObjectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false); + +bool isNotFoundError(Aws::S3::S3Errors error); + +/// Returns the object's metadata. +std::map getObjectMetadata(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool throw_on_error = true, bool for_disk_s3 = false); + } #endif diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index ec77fc44de6..084d3b5470d 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -183,11 +183,11 @@ void WriteBufferFromS3::finalizeImpl() { LOG_TRACE(log, "Checking object {} exists after upload", key); - auto response = S3::headObject(*client_ptr, bucket, key, "", write_settings.for_object_storage); - if (!response.IsSuccess()) - throw S3Exception(fmt::format("Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", key, bucket), response.GetError().GetErrorType()); - else + auto [exists, error] = S3::checkObjectExists(*client_ptr, bucket, key, "", write_settings.for_object_storage); + if (exists) LOG_TRACE(log, "Object {} exists after upload", key); + else + throw S3Exception(fmt::format("Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", key, bucket), error.GetErrorType()); } }