mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Stop using HeadObject requests in S3
because they don't work well with endpoints without explicit region.
This commit is contained in:
parent
9d5ec474a3
commit
21b8aaeb8b
@ -156,10 +156,9 @@ void BackupWriterS3::copyObjectImpl(
|
|||||||
const String & src_key,
|
const String & src_key,
|
||||||
const String & dst_bucket,
|
const String & dst_bucket,
|
||||||
const String & dst_key,
|
const String & dst_key,
|
||||||
const Aws::S3::Model::HeadObjectResult & head,
|
size_t size,
|
||||||
const std::optional<ObjectAttributes> & metadata) const
|
const std::optional<ObjectAttributes> & metadata) const
|
||||||
{
|
{
|
||||||
size_t size = head.GetContentLength();
|
|
||||||
LOG_TRACE(log, "Copying {} bytes using single-operation copy", size);
|
LOG_TRACE(log, "Copying {} bytes using single-operation copy", size);
|
||||||
|
|
||||||
Aws::S3::Model::CopyObjectRequest request;
|
Aws::S3::Model::CopyObjectRequest request;
|
||||||
@ -177,7 +176,7 @@ void BackupWriterS3::copyObjectImpl(
|
|||||||
if (!outcome.IsSuccess() && (outcome.GetError().GetExceptionName() == "EntityTooLarge"
|
if (!outcome.IsSuccess() && (outcome.GetError().GetExceptionName() == "EntityTooLarge"
|
||||||
|| outcome.GetError().GetExceptionName() == "InvalidRequest"))
|
|| outcome.GetError().GetExceptionName() == "InvalidRequest"))
|
||||||
{ // Can't come here with MinIO, MinIO allows single part upload for large objects.
|
{ // Can't come here with MinIO, MinIO allows single part upload for large objects.
|
||||||
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, head, metadata);
|
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, size, metadata);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -191,10 +190,9 @@ void BackupWriterS3::copyObjectMultipartImpl(
|
|||||||
const String & src_key,
|
const String & src_key,
|
||||||
const String & dst_bucket,
|
const String & dst_bucket,
|
||||||
const String & dst_key,
|
const String & dst_key,
|
||||||
const Aws::S3::Model::HeadObjectResult & head,
|
size_t size,
|
||||||
const std::optional<ObjectAttributes> & metadata) const
|
const std::optional<ObjectAttributes> & metadata) const
|
||||||
{
|
{
|
||||||
size_t size = head.GetContentLength();
|
|
||||||
LOG_TRACE(log, "Copying {} bytes using multipart upload copy", size);
|
LOG_TRACE(log, "Copying {} bytes using multipart upload copy", size);
|
||||||
|
|
||||||
String multipart_upload_id;
|
String multipart_upload_id;
|
||||||
@ -309,16 +307,16 @@ void BackupWriterS3::copyFileNative(DiskPtr from_disk, const String & file_name_
|
|||||||
std::string source_bucket = object_storage->getObjectsNamespace();
|
std::string source_bucket = object_storage->getObjectsNamespace();
|
||||||
auto file_path = fs::path(s3_uri.key) / file_name_to;
|
auto file_path = fs::path(s3_uri.key) / file_name_to;
|
||||||
|
|
||||||
auto head = S3::headObject(*client, source_bucket, objects[0].absolute_path).GetResult();
|
auto size = S3::getObjectSize(*client, source_bucket, objects[0].absolute_path);
|
||||||
if (static_cast<size_t>(head.GetContentLength()) < request_settings.getUploadSettings().max_single_operation_copy_size)
|
if (size < request_settings.getUploadSettings().max_single_operation_copy_size)
|
||||||
{
|
{
|
||||||
copyObjectImpl(
|
copyObjectImpl(
|
||||||
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head);
|
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, size);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
copyObjectMultipartImpl(
|
copyObjectMultipartImpl(
|
||||||
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head);
|
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -67,7 +67,7 @@ private:
|
|||||||
const String & src_key,
|
const String & src_key,
|
||||||
const String & dst_bucket,
|
const String & dst_bucket,
|
||||||
const String & dst_key,
|
const String & dst_key,
|
||||||
const Aws::S3::Model::HeadObjectResult & head,
|
size_t size,
|
||||||
const std::optional<ObjectAttributes> & metadata = std::nullopt) const;
|
const std::optional<ObjectAttributes> & metadata = std::nullopt) const;
|
||||||
|
|
||||||
void copyObjectMultipartImpl(
|
void copyObjectMultipartImpl(
|
||||||
@ -75,7 +75,7 @@ private:
|
|||||||
const String & src_key,
|
const String & src_key,
|
||||||
const String & dst_bucket,
|
const String & dst_bucket,
|
||||||
const String & dst_key,
|
const String & dst_key,
|
||||||
const Aws::S3::Model::HeadObjectResult & head,
|
size_t size,
|
||||||
const std::optional<ObjectAttributes> & metadata = std::nullopt) const;
|
const std::optional<ObjectAttributes> & metadata = std::nullopt) const;
|
||||||
|
|
||||||
void removeFilesBatch(const Strings & file_names);
|
void removeFilesBatch(const Strings & file_names);
|
||||||
|
@ -309,6 +309,8 @@ The server successfully detected this situation and will download merged part fr
|
|||||||
M(S3CopyObject, "Number of S3 API CopyObject calls.") \
|
M(S3CopyObject, "Number of S3 API CopyObject calls.") \
|
||||||
M(S3ListObjects, "Number of S3 API ListObjects calls.") \
|
M(S3ListObjects, "Number of S3 API ListObjects calls.") \
|
||||||
M(S3HeadObject, "Number of S3 API HeadObject calls.") \
|
M(S3HeadObject, "Number of S3 API HeadObject calls.") \
|
||||||
|
M(S3GetObjectAttributes, "Number of S3 API GetObjectAttributes calls.") \
|
||||||
|
M(S3GetObjectMetadata, "Number of S3 API GetObject calls for getting metadata.") \
|
||||||
M(S3CreateMultipartUpload, "Number of S3 API CreateMultipartUpload calls.") \
|
M(S3CreateMultipartUpload, "Number of S3 API CreateMultipartUpload calls.") \
|
||||||
M(S3UploadPartCopy, "Number of S3 API UploadPartCopy calls.") \
|
M(S3UploadPartCopy, "Number of S3 API UploadPartCopy calls.") \
|
||||||
M(S3UploadPart, "Number of S3 API UploadPart calls.") \
|
M(S3UploadPart, "Number of S3 API UploadPart calls.") \
|
||||||
@ -321,6 +323,8 @@ The server successfully detected this situation and will download merged part fr
|
|||||||
M(DiskS3CopyObject, "Number of DiskS3 API CopyObject calls.") \
|
M(DiskS3CopyObject, "Number of DiskS3 API CopyObject calls.") \
|
||||||
M(DiskS3ListObjects, "Number of DiskS3 API ListObjects calls.") \
|
M(DiskS3ListObjects, "Number of DiskS3 API ListObjects calls.") \
|
||||||
M(DiskS3HeadObject, "Number of DiskS3 API HeadObject calls.") \
|
M(DiskS3HeadObject, "Number of DiskS3 API HeadObject calls.") \
|
||||||
|
M(DiskS3GetObjectAttributes, "Number of DiskS3 API GetObjectAttributes calls.") \
|
||||||
|
M(DiskS3GetObjectMetadata, "Number of DiskS3 API GetObject calls for getting metadata.") \
|
||||||
M(DiskS3CreateMultipartUpload, "Number of DiskS3 API CreateMultipartUpload calls.") \
|
M(DiskS3CreateMultipartUpload, "Number of DiskS3 API CreateMultipartUpload calls.") \
|
||||||
M(DiskS3UploadPartCopy, "Number of DiskS3 API UploadPartCopy calls.") \
|
M(DiskS3UploadPartCopy, "Number of DiskS3 API UploadPartCopy calls.") \
|
||||||
M(DiskS3UploadPart, "Number of DiskS3 API UploadPart calls.") \
|
M(DiskS3UploadPart, "Number of DiskS3 API UploadPart calls.") \
|
||||||
|
@ -125,9 +125,9 @@ std::string S3ObjectStorage::generateBlobNameForPath(const std::string & /* path
|
|||||||
getRandomASCIIString(key_name_total_size - key_name_prefix_size));
|
getRandomASCIIString(key_name_total_size - key_name_prefix_size));
|
||||||
}
|
}
|
||||||
|
|
||||||
Aws::S3::Model::HeadObjectOutcome S3ObjectStorage::requestObjectHeadData(const std::string & bucket_from, const std::string & key) const
|
size_t S3ObjectStorage::getObjectSize(const std::string & bucket_from, const std::string & key) const
|
||||||
{
|
{
|
||||||
return S3::headObject(*client.get(), bucket_from, key, "", true);
|
return S3::getObjectSize(*client.get(), bucket_from, key, "", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool S3ObjectStorage::exists(const StoredObject & object) const
|
bool S3ObjectStorage::exists(const StoredObject & object) const
|
||||||
@ -135,6 +135,11 @@ bool S3ObjectStorage::exists(const StoredObject & object) const
|
|||||||
return S3::objectExists(*client.get(), bucket, object.absolute_path, "", true);
|
return S3::objectExists(*client.get(), bucket, object.absolute_path, "", true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::pair<bool /* exists */, Aws::S3::S3Error> S3ObjectStorage::checkObjectExists(const std::string & bucket_from, const std::string & key) const
|
||||||
|
{
|
||||||
|
return S3::checkObjectExists(*client.get(), bucket_from, key, "", true);
|
||||||
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||||
const StoredObjects & objects,
|
const StoredObjects & objects,
|
||||||
const ReadSettings & read_settings,
|
const ReadSettings & read_settings,
|
||||||
@ -409,13 +414,10 @@ ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) cons
|
|||||||
{
|
{
|
||||||
ObjectMetadata result;
|
ObjectMetadata result;
|
||||||
|
|
||||||
auto object_head = requestObjectHeadData(bucket, path);
|
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, "", true, true);
|
||||||
throwIfError(object_head);
|
result.size_bytes = object_info.size;
|
||||||
|
result.last_modified = object_info.last_modification_time;
|
||||||
auto & object_head_result = object_head.GetResult();
|
result.attributes = S3::getObjectMetadata(*client.get(), bucket, path, "", true, true);
|
||||||
result.size_bytes = object_head_result.GetContentLength();
|
|
||||||
result.last_modified = object_head_result.GetLastModified().Millis();
|
|
||||||
result.attributes = object_head_result.GetMetadata();
|
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
@ -442,7 +444,7 @@ void S3ObjectStorage::copyObjectImpl(
|
|||||||
const String & src_key,
|
const String & src_key,
|
||||||
const String & dst_bucket,
|
const String & dst_bucket,
|
||||||
const String & dst_key,
|
const String & dst_key,
|
||||||
std::optional<Aws::S3::Model::HeadObjectResult> head,
|
size_t size,
|
||||||
std::optional<ObjectAttributes> metadata) const
|
std::optional<ObjectAttributes> metadata) const
|
||||||
{
|
{
|
||||||
auto client_ptr = client.get();
|
auto client_ptr = client.get();
|
||||||
@ -464,7 +466,7 @@ void S3ObjectStorage::copyObjectImpl(
|
|||||||
if (!outcome.IsSuccess() && (outcome.GetError().GetExceptionName() == "EntityTooLarge"
|
if (!outcome.IsSuccess() && (outcome.GetError().GetExceptionName() == "EntityTooLarge"
|
||||||
|| outcome.GetError().GetExceptionName() == "InvalidRequest"))
|
|| outcome.GetError().GetExceptionName() == "InvalidRequest"))
|
||||||
{ // Can't come here with MinIO, MinIO allows single part upload for large objects.
|
{ // Can't come here with MinIO, MinIO allows single part upload for large objects.
|
||||||
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, head, metadata);
|
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, size, metadata);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -473,8 +475,8 @@ void S3ObjectStorage::copyObjectImpl(
|
|||||||
auto settings_ptr = s3_settings.get();
|
auto settings_ptr = s3_settings.get();
|
||||||
if (settings_ptr->request_settings.check_objects_after_upload)
|
if (settings_ptr->request_settings.check_objects_after_upload)
|
||||||
{
|
{
|
||||||
auto object_head = requestObjectHeadData(dst_bucket, dst_key);
|
auto [exists, error] = checkObjectExists(dst_bucket, dst_key);
|
||||||
if (!object_head.IsSuccess())
|
if (!exists)
|
||||||
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
|
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -485,15 +487,11 @@ void S3ObjectStorage::copyObjectMultipartImpl(
|
|||||||
const String & src_key,
|
const String & src_key,
|
||||||
const String & dst_bucket,
|
const String & dst_bucket,
|
||||||
const String & dst_key,
|
const String & dst_key,
|
||||||
std::optional<Aws::S3::Model::HeadObjectResult> head,
|
size_t size,
|
||||||
std::optional<ObjectAttributes> metadata) const
|
std::optional<ObjectAttributes> metadata) const
|
||||||
{
|
{
|
||||||
if (!head)
|
|
||||||
head = requestObjectHeadData(src_bucket, src_key).GetResult();
|
|
||||||
|
|
||||||
auto settings_ptr = s3_settings.get();
|
auto settings_ptr = s3_settings.get();
|
||||||
auto client_ptr = client.get();
|
auto client_ptr = client.get();
|
||||||
size_t size = head->GetContentLength();
|
|
||||||
|
|
||||||
String multipart_upload_id;
|
String multipart_upload_id;
|
||||||
|
|
||||||
@ -570,8 +568,8 @@ void S3ObjectStorage::copyObjectMultipartImpl(
|
|||||||
|
|
||||||
if (settings_ptr->request_settings.check_objects_after_upload)
|
if (settings_ptr->request_settings.check_objects_after_upload)
|
||||||
{
|
{
|
||||||
auto object_head = requestObjectHeadData(dst_bucket, dst_key);
|
auto [exists, error] = checkObjectExists(dst_bucket, dst_key);
|
||||||
if (!object_head.IsSuccess())
|
if (!exists)
|
||||||
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
|
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -580,18 +578,18 @@ void S3ObjectStorage::copyObjectMultipartImpl(
|
|||||||
void S3ObjectStorage::copyObject( // NOLINT
|
void S3ObjectStorage::copyObject( // NOLINT
|
||||||
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> object_to_attributes)
|
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> object_to_attributes)
|
||||||
{
|
{
|
||||||
auto head = requestObjectHeadData(bucket, object_from.absolute_path).GetResult();
|
auto size = getObjectSize(bucket, object_from.absolute_path);
|
||||||
static constexpr int64_t multipart_upload_threashold = 5UL * 1024 * 1024 * 1024;
|
static constexpr int64_t multipart_upload_threashold = 5UL * 1024 * 1024 * 1024;
|
||||||
|
|
||||||
if (head.GetContentLength() >= multipart_upload_threashold)
|
if (size >= multipart_upload_threashold)
|
||||||
{
|
{
|
||||||
copyObjectMultipartImpl(
|
copyObjectMultipartImpl(
|
||||||
bucket, object_from.absolute_path, bucket, object_to.absolute_path, head, object_to_attributes);
|
bucket, object_from.absolute_path, bucket, object_to.absolute_path, size, object_to_attributes);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
copyObjectImpl(
|
copyObjectImpl(
|
||||||
bucket, object_from.absolute_path, bucket, object_to.absolute_path, head, object_to_attributes);
|
bucket, object_from.absolute_path, bucket, object_to.absolute_path, size, object_to_attributes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,7 +172,7 @@ private:
|
|||||||
const String & src_key,
|
const String & src_key,
|
||||||
const String & dst_bucket,
|
const String & dst_bucket,
|
||||||
const String & dst_key,
|
const String & dst_key,
|
||||||
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
|
size_t size,
|
||||||
std::optional<ObjectAttributes> metadata = std::nullopt) const;
|
std::optional<ObjectAttributes> metadata = std::nullopt) const;
|
||||||
|
|
||||||
void copyObjectMultipartImpl(
|
void copyObjectMultipartImpl(
|
||||||
@ -180,13 +180,14 @@ private:
|
|||||||
const String & src_key,
|
const String & src_key,
|
||||||
const String & dst_bucket,
|
const String & dst_bucket,
|
||||||
const String & dst_key,
|
const String & dst_key,
|
||||||
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
|
size_t size,
|
||||||
std::optional<ObjectAttributes> metadata = std::nullopt) const;
|
std::optional<ObjectAttributes> metadata = std::nullopt) const;
|
||||||
|
|
||||||
void removeObjectImpl(const StoredObject & object, bool if_exists);
|
void removeObjectImpl(const StoredObject & object, bool if_exists);
|
||||||
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
|
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
|
||||||
|
|
||||||
Aws::S3::Model::HeadObjectOutcome requestObjectHeadData(const std::string & bucket_from, const std::string & key) const;
|
size_t getObjectSize(const std::string & bucket_from, const std::string & key) const;
|
||||||
|
std::pair<bool /* exists */, Aws::S3::S3Error> checkObjectExists(const std::string & bucket_from, const std::string & key) const;
|
||||||
|
|
||||||
std::string bucket;
|
std::string bucket;
|
||||||
|
|
||||||
|
@ -27,7 +27,8 @@
|
|||||||
# include <aws/core/utils/UUID.h>
|
# include <aws/core/utils/UUID.h>
|
||||||
# include <aws/core/http/HttpClientFactory.h>
|
# include <aws/core/http/HttpClientFactory.h>
|
||||||
# include <aws/s3/S3Client.h>
|
# include <aws/s3/S3Client.h>
|
||||||
# include <aws/s3/model/HeadObjectRequest.h>
|
# include <aws/s3/model/GetObjectAttributesRequest.h>
|
||||||
|
# include <aws/s3/model/GetObjectRequest.h>
|
||||||
|
|
||||||
# include <IO/S3/PocoHTTPClientFactory.h>
|
# include <IO/S3/PocoHTTPClientFactory.h>
|
||||||
# include <IO/S3/PocoHTTPClient.h>
|
# include <IO/S3/PocoHTTPClient.h>
|
||||||
@ -40,8 +41,10 @@
|
|||||||
|
|
||||||
namespace ProfileEvents
|
namespace ProfileEvents
|
||||||
{
|
{
|
||||||
extern const Event S3HeadObject;
|
extern const Event S3GetObjectAttributes;
|
||||||
extern const Event DiskS3HeadObject;
|
extern const Event DiskS3GetObjectAttributes;
|
||||||
|
extern const Event S3GetObjectMetadata;
|
||||||
|
extern const Event DiskS3GetObjectMetadata;
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
@ -699,6 +702,28 @@ public:
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// Performs a GetObjectAttributes request.
|
||||||
|
Aws::S3::Model::GetObjectAttributesOutcome getObjectAttributes(
|
||||||
|
const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
|
||||||
|
{
|
||||||
|
ProfileEvents::increment(ProfileEvents::S3GetObjectAttributes);
|
||||||
|
if (for_disk_s3)
|
||||||
|
ProfileEvents::increment(ProfileEvents::DiskS3GetObjectAttributes);
|
||||||
|
|
||||||
|
/// We must not use the `HeadObject` request, see the comment about `HeadObjectRequest` in S3Common.h.
|
||||||
|
|
||||||
|
Aws::S3::Model::GetObjectAttributesRequest req;
|
||||||
|
req.SetBucket(bucket);
|
||||||
|
req.SetKey(key);
|
||||||
|
|
||||||
|
if (!version_id.empty())
|
||||||
|
req.SetVersionId(version_id);
|
||||||
|
|
||||||
|
req.SetObjectAttributes({Aws::S3::Model::ObjectAttributes::ObjectSize});
|
||||||
|
|
||||||
|
return client.GetObjectAttributes(req);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -894,36 +919,19 @@ namespace S3
|
|||||||
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY;
|
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY;
|
||||||
}
|
}
|
||||||
|
|
||||||
Aws::S3::Model::HeadObjectOutcome headObject(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
|
|
||||||
{
|
|
||||||
ProfileEvents::increment(ProfileEvents::S3HeadObject);
|
|
||||||
if (for_disk_s3)
|
|
||||||
ProfileEvents::increment(ProfileEvents::DiskS3HeadObject);
|
|
||||||
|
|
||||||
Aws::S3::Model::HeadObjectRequest req;
|
|
||||||
req.SetBucket(bucket);
|
|
||||||
req.SetKey(key);
|
|
||||||
|
|
||||||
if (!version_id.empty())
|
|
||||||
req.SetVersionId(version_id);
|
|
||||||
|
|
||||||
return client.HeadObject(req);
|
|
||||||
}
|
|
||||||
|
|
||||||
S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
|
S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
|
||||||
{
|
{
|
||||||
auto outcome = headObject(client, bucket, key, version_id, for_disk_s3);
|
auto outcome = getObjectAttributes(client, bucket, key, version_id, for_disk_s3);
|
||||||
|
|
||||||
if (outcome.IsSuccess())
|
if (outcome.IsSuccess())
|
||||||
{
|
{
|
||||||
auto read_result = outcome.GetResultWithOwnership();
|
const auto & result = outcome.GetResult();
|
||||||
return {.size = static_cast<size_t>(read_result.GetContentLength()), .last_modification_time = read_result.GetLastModified().Millis() / 1000};
|
return {.size = static_cast<size_t>(result.GetObjectSize()), .last_modification_time = result.GetLastModified().Millis() / 1000};
|
||||||
}
|
}
|
||||||
else if (throw_on_error)
|
else if (throw_on_error)
|
||||||
{
|
{
|
||||||
const auto & error = outcome.GetError();
|
const auto & error = outcome.GetError();
|
||||||
throw DB::Exception(ErrorCodes::S3_ERROR,
|
throw DB::Exception(ErrorCodes::S3_ERROR,
|
||||||
"Failed to HEAD object: {}. HTTP response code: {}",
|
"Failed to get object attributes: {}. HTTP response code: {}",
|
||||||
error.GetMessage(), static_cast<size_t>(error.GetResponseCode()));
|
error.GetMessage(), static_cast<size_t>(error.GetResponseCode()));
|
||||||
}
|
}
|
||||||
return {};
|
return {};
|
||||||
@ -936,12 +944,11 @@ namespace S3
|
|||||||
|
|
||||||
bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
|
bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
|
||||||
{
|
{
|
||||||
auto outcome = headObject(client, bucket, key, version_id, for_disk_s3);
|
auto [exists, error] = checkObjectExists(client, bucket, key, version_id, for_disk_s3);
|
||||||
|
|
||||||
if (outcome.IsSuccess())
|
if (exists)
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
const auto & error = outcome.GetError();
|
|
||||||
if (isNotFoundError(error.GetErrorType()))
|
if (isNotFoundError(error.GetErrorType()))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
@ -949,6 +956,47 @@ namespace S3
|
|||||||
"Failed to check existence of key {} in bucket {}: {}",
|
"Failed to check existence of key {} in bucket {}: {}",
|
||||||
key, bucket, error.GetMessage());
|
key, bucket, error.GetMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::pair<bool /* exists */, Aws::S3::S3Error> checkObjectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
|
||||||
|
{
|
||||||
|
auto outcome = getObjectAttributes(client, bucket, key, version_id, for_disk_s3);
|
||||||
|
if (outcome.IsSuccess())
|
||||||
|
return {true, {}};
|
||||||
|
return {false, std::move(outcome.GetError())};
|
||||||
|
}
|
||||||
|
|
||||||
|
std::map<String, String> getObjectMetadata(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3)
|
||||||
|
{
|
||||||
|
ProfileEvents::increment(ProfileEvents::S3GetObjectMetadata);
|
||||||
|
if (for_disk_s3)
|
||||||
|
ProfileEvents::increment(ProfileEvents::DiskS3GetObjectMetadata);
|
||||||
|
|
||||||
|
/// We must not use the `HeadObject` request, see the comment about `HeadObjectRequest` in S3Common.h.
|
||||||
|
|
||||||
|
Aws::S3::Model::GetObjectRequest req;
|
||||||
|
req.SetBucket(bucket);
|
||||||
|
req.SetKey(key);
|
||||||
|
|
||||||
|
/// Only the first byte will be read.
|
||||||
|
/// We don't need that first byte but the range should be set otherwise the entire object will be read.
|
||||||
|
req.SetRange("bytes=0-0");
|
||||||
|
|
||||||
|
if (!version_id.empty())
|
||||||
|
req.SetVersionId(version_id);
|
||||||
|
|
||||||
|
auto outcome = client.GetObject(req);
|
||||||
|
|
||||||
|
if (outcome.IsSuccess())
|
||||||
|
return outcome.GetResult().GetMetadata();
|
||||||
|
|
||||||
|
if (!throw_on_error)
|
||||||
|
return {};
|
||||||
|
|
||||||
|
const auto & error = outcome.GetError();
|
||||||
|
throw S3Exception(error.GetErrorType(),
|
||||||
|
"Failed to get metadata of key {} in bucket {}: {}",
|
||||||
|
key, bucket, error.GetMessage());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -11,15 +11,15 @@
|
|||||||
#if USE_AWS_S3
|
#if USE_AWS_S3
|
||||||
|
|
||||||
#include <base/types.h>
|
#include <base/types.h>
|
||||||
#include <aws/core/Aws.h>
|
|
||||||
#include <aws/core/client/ClientConfiguration.h>
|
|
||||||
#include <aws/s3/S3Client.h>
|
|
||||||
#include <aws/s3/S3Errors.h>
|
|
||||||
#include <Poco/URI.h>
|
|
||||||
|
|
||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
#include <Common/Throttler_fwd.h>
|
#include <Common/Throttler_fwd.h>
|
||||||
|
|
||||||
|
#include <Poco/URI.h>
|
||||||
|
#include <aws/core/Aws.h>
|
||||||
|
#include <aws/s3/S3Errors.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace Aws::S3 { class S3Client; }
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -121,22 +121,36 @@ struct URI
|
|||||||
static void validateBucket(const String & bucket, const Poco::URI & uri);
|
static void validateBucket(const String & bucket, const Poco::URI & uri);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/// WARNING: Don't use `HeadObjectRequest`! Use the functions below instead.
|
||||||
|
/// Explanation: The `HeadObject` request never returns a response body (even if there is an error) however
|
||||||
|
/// if the request was sent without specifying a region in the endpoint (i.e. for example "https://test.s3.amazonaws.com/mydata.csv"
|
||||||
|
/// instead of "https://test.s3-us-west-2.amazonaws.com/mydata.csv") then that response body is one of the main ways
|
||||||
|
/// to determine the correct region and try to repeat the request again with the correct region.
|
||||||
|
/// For any other request type (`GetObject`, `ListObjects`, etc.) AWS SDK does that because they have response bodies,
|
||||||
|
/// but for `HeadObject` there is no response body so this way doesn't work.
|
||||||
|
/// That's why it's better to avoid using `HeadObject` requests at all.
|
||||||
|
/// See https://github.com/aws/aws-sdk-cpp/issues/1558 and also the function S3ErrorMarshaller::ExtractRegion() for more details.
|
||||||
|
|
||||||
struct ObjectInfo
|
struct ObjectInfo
|
||||||
{
|
{
|
||||||
size_t size = 0;
|
size_t size = 0;
|
||||||
time_t last_modification_time = 0;
|
time_t last_modification_time = 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
bool isNotFoundError(Aws::S3::S3Errors error);
|
S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool throw_on_error = true, bool for_disk_s3 = false);
|
||||||
|
|
||||||
Aws::S3::Model::HeadObjectOutcome headObject(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false);
|
size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool throw_on_error = true, bool for_disk_s3 = false);
|
||||||
|
|
||||||
S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
|
|
||||||
|
|
||||||
size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool throw_on_error, bool for_disk_s3);
|
|
||||||
|
|
||||||
bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false);
|
bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false);
|
||||||
|
|
||||||
|
/// Checks if the object exists. If it doesn't exists the function returns an error without throwing any exception.
|
||||||
|
std::pair<bool /* exists */, Aws::S3::S3Error> checkObjectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false);
|
||||||
|
|
||||||
|
bool isNotFoundError(Aws::S3::S3Errors error);
|
||||||
|
|
||||||
|
/// Returns the object's metadata.
|
||||||
|
std::map<String, String> getObjectMetadata(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool throw_on_error = true, bool for_disk_s3 = false);
|
||||||
|
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
@ -183,11 +183,11 @@ void WriteBufferFromS3::finalizeImpl()
|
|||||||
{
|
{
|
||||||
LOG_TRACE(log, "Checking object {} exists after upload", key);
|
LOG_TRACE(log, "Checking object {} exists after upload", key);
|
||||||
|
|
||||||
auto response = S3::headObject(*client_ptr, bucket, key, "", write_settings.for_object_storage);
|
auto [exists, error] = S3::checkObjectExists(*client_ptr, bucket, key, "", write_settings.for_object_storage);
|
||||||
if (!response.IsSuccess())
|
if (exists)
|
||||||
throw S3Exception(fmt::format("Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", key, bucket), response.GetError().GetErrorType());
|
|
||||||
else
|
|
||||||
LOG_TRACE(log, "Object {} exists after upload", key);
|
LOG_TRACE(log, "Object {} exists after upload", key);
|
||||||
|
else
|
||||||
|
throw S3Exception(fmt::format("Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", key, bucket), error.GetErrorType());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user