mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 01:25:21 +00:00
Make checkObjectExists() easier.
This commit is contained in:
parent
a955504043
commit
e435edb4ab
@ -309,8 +309,8 @@ The server successfully detected this situation and will download merged part fr
|
|||||||
M(S3CopyObject, "Number of S3 API CopyObject calls.") \
|
M(S3CopyObject, "Number of S3 API CopyObject calls.") \
|
||||||
M(S3ListObjects, "Number of S3 API ListObjects calls.") \
|
M(S3ListObjects, "Number of S3 API ListObjects calls.") \
|
||||||
M(S3HeadObject, "Number of S3 API HeadObject calls.") \
|
M(S3HeadObject, "Number of S3 API HeadObject calls.") \
|
||||||
M(S3GetObjectAttributes, "Number of S3 API GetObjectAttributes calls.") \
|
M(S3GetObjectAttributes, "Number of S3 API GetObjectAttributes calls.") \
|
||||||
M(S3GetObjectMetadata, "Number of S3 API GetObject calls for getting metadata.") \
|
M(S3GetObjectMetadata, "Number of S3 API GetObject calls for getting metadata.") \
|
||||||
M(S3CreateMultipartUpload, "Number of S3 API CreateMultipartUpload calls.") \
|
M(S3CreateMultipartUpload, "Number of S3 API CreateMultipartUpload calls.") \
|
||||||
M(S3UploadPartCopy, "Number of S3 API UploadPartCopy calls.") \
|
M(S3UploadPartCopy, "Number of S3 API UploadPartCopy calls.") \
|
||||||
M(S3UploadPart, "Number of S3 API UploadPart calls.") \
|
M(S3UploadPart, "Number of S3 API UploadPart calls.") \
|
||||||
@ -323,8 +323,8 @@ The server successfully detected this situation and will download merged part fr
|
|||||||
M(DiskS3CopyObject, "Number of DiskS3 API CopyObject calls.") \
|
M(DiskS3CopyObject, "Number of DiskS3 API CopyObject calls.") \
|
||||||
M(DiskS3ListObjects, "Number of DiskS3 API ListObjects calls.") \
|
M(DiskS3ListObjects, "Number of DiskS3 API ListObjects calls.") \
|
||||||
M(DiskS3HeadObject, "Number of DiskS3 API HeadObject calls.") \
|
M(DiskS3HeadObject, "Number of DiskS3 API HeadObject calls.") \
|
||||||
M(DiskS3GetObjectAttributes, "Number of DiskS3 API GetObjectAttributes calls.") \
|
M(DiskS3GetObjectAttributes, "Number of DiskS3 API GetObjectAttributes calls.") \
|
||||||
M(DiskS3GetObjectMetadata, "Number of DiskS3 API GetObject calls for getting metadata.") \
|
M(DiskS3GetObjectMetadata, "Number of DiskS3 API GetObject calls for getting metadata.") \
|
||||||
M(DiskS3CreateMultipartUpload, "Number of DiskS3 API CreateMultipartUpload calls.") \
|
M(DiskS3CreateMultipartUpload, "Number of DiskS3 API CreateMultipartUpload calls.") \
|
||||||
M(DiskS3UploadPartCopy, "Number of DiskS3 API UploadPartCopy calls.") \
|
M(DiskS3UploadPartCopy, "Number of DiskS3 API UploadPartCopy calls.") \
|
||||||
M(DiskS3UploadPart, "Number of DiskS3 API UploadPart calls.") \
|
M(DiskS3UploadPart, "Number of DiskS3 API UploadPart calls.") \
|
||||||
|
@ -135,9 +135,9 @@ bool S3ObjectStorage::exists(const StoredObject & object) const
|
|||||||
return S3::objectExists(*client.get(), bucket, object.absolute_path, {}, /* for_disk_s3= */ true);
|
return S3::objectExists(*client.get(), bucket, object.absolute_path, {}, /* for_disk_s3= */ true);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::pair<bool /* exists */, Aws::S3::S3Error> S3ObjectStorage::checkObjectExists(const std::string & bucket_from, const std::string & key) const
|
void S3ObjectStorage::checkObjectExists(const std::string & bucket_from, const std::string & key, const std::string_view & description) const
|
||||||
{
|
{
|
||||||
return S3::checkObjectExists(*client.get(), bucket_from, key, {}, /* for_disk_s3= */ true);
|
return S3::checkObjectExists(*client.get(), bucket_from, key, {}, /* for_disk_s3= */ true, description);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||||
@ -474,12 +474,7 @@ void S3ObjectStorage::copyObjectImpl(
|
|||||||
|
|
||||||
auto settings_ptr = s3_settings.get();
|
auto settings_ptr = s3_settings.get();
|
||||||
if (settings_ptr->request_settings.check_objects_after_upload)
|
if (settings_ptr->request_settings.check_objects_after_upload)
|
||||||
{
|
checkObjectExists(dst_bucket, dst_key, "Immediately after upload");
|
||||||
auto [exists, error] = checkObjectExists(dst_bucket, dst_key);
|
|
||||||
if (!exists)
|
|
||||||
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void S3ObjectStorage::copyObjectMultipartImpl(
|
void S3ObjectStorage::copyObjectMultipartImpl(
|
||||||
@ -567,12 +562,7 @@ void S3ObjectStorage::copyObjectMultipartImpl(
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (settings_ptr->request_settings.check_objects_after_upload)
|
if (settings_ptr->request_settings.check_objects_after_upload)
|
||||||
{
|
checkObjectExists(dst_bucket, dst_key, "Immediately after upload");
|
||||||
auto [exists, error] = checkObjectExists(dst_bucket, dst_key);
|
|
||||||
if (!exists)
|
|
||||||
throw Exception(ErrorCodes::S3_ERROR, "Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", dst_key, dst_bucket);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void S3ObjectStorage::copyObject( // NOLINT
|
void S3ObjectStorage::copyObject( // NOLINT
|
||||||
|
@ -187,7 +187,7 @@ private:
|
|||||||
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
|
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
|
||||||
|
|
||||||
size_t getObjectSize(const std::string & bucket_from, const std::string & key) const;
|
size_t getObjectSize(const std::string & bucket_from, const std::string & key) const;
|
||||||
std::pair<bool /* exists */, Aws::S3::S3Error> checkObjectExists(const std::string & bucket_from, const std::string & key) const;
|
void checkObjectExists(const std::string & bucket_from, const std::string & key, const std::string_view & description) const;
|
||||||
|
|
||||||
std::string bucket;
|
std::string bucket;
|
||||||
|
|
||||||
|
@ -942,14 +942,14 @@ namespace S3
|
|||||||
return getObjectInfo(client, bucket, key, version_id, for_disk_s3, throw_on_error).size;
|
return getObjectInfo(client, bucket, key, version_id, for_disk_s3, throw_on_error).size;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, bool throw_on_error)
|
bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
|
||||||
{
|
{
|
||||||
auto [exists, error] = checkObjectExists(client, bucket, key, version_id, for_disk_s3);
|
auto [exists, error] = checkObjectExists(client, bucket, key, version_id, for_disk_s3);
|
||||||
|
|
||||||
if (exists)
|
if (exists)
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
if (!throw_on_error || isNotFoundError(error.GetErrorType()))
|
if (isNotFoundError(error.GetErrorType()))
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
throw S3Exception(error.GetErrorType(),
|
throw S3Exception(error.GetErrorType(),
|
||||||
@ -957,12 +957,14 @@ namespace S3
|
|||||||
key, bucket, error.GetMessage());
|
key, bucket, error.GetMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
std::pair<bool /* exists */, Aws::S3::S3Error> checkObjectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3)
|
void checkObjectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, const std::string_view & description)
|
||||||
{
|
{
|
||||||
auto outcome = getObjectAttributes(client, bucket, key, version_id, for_disk_s3);
|
auto outcome = getObjectAttributes(client, bucket, key, version_id, for_disk_s3);
|
||||||
if (outcome.IsSuccess())
|
if (outcome.IsSuccess())
|
||||||
return {true, {}};
|
return;
|
||||||
return {false, std::move(outcome.GetError())};
|
const auto & error = outcome.GetError();
|
||||||
|
throw S3Exception(error.GetErrorType(), "{}Object {} in bucket {} suddenly disappeared: {}",
|
||||||
|
(description.empty() ? "" : (String(description) + ": ")), key, bucket, error.GetErrorMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
std::map<String, String> getObjectMetadata(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, bool throw_on_error)
|
std::map<String, String> getObjectMetadata(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id, bool for_disk_s3, bool throw_on_error)
|
||||||
@ -988,7 +990,7 @@ namespace S3
|
|||||||
|
|
||||||
if (outcome.IsSuccess())
|
if (outcome.IsSuccess())
|
||||||
return outcome.GetResult().GetMetadata();
|
return outcome.GetResult().GetMetadata();
|
||||||
|
|
||||||
if (!throw_on_error)
|
if (!throw_on_error)
|
||||||
return {};
|
return {};
|
||||||
|
|
||||||
|
@ -141,10 +141,10 @@ S3::ObjectInfo getObjectInfo(const Aws::S3::S3Client & client, const String & bu
|
|||||||
|
|
||||||
size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, bool throw_on_error = true);
|
size_t getObjectSize(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, bool throw_on_error = true);
|
||||||
|
|
||||||
bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, bool throw_on_error = true);
|
bool objectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false);
|
||||||
|
|
||||||
/// Checks if the object exists. If it doesn't exists the function returns an error without throwing any exception.
|
/// Throws an exception if a specified object doesn't exist. `description` is used as a part of the error message.
|
||||||
std::pair<bool /* exists */, Aws::S3::S3Error> checkObjectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false);
|
void checkObjectExists(const Aws::S3::S3Client & client, const String & bucket, const String & key, const String & version_id = "", bool for_disk_s3 = false, const std::string_view & description = {});
|
||||||
|
|
||||||
bool isNotFoundError(Aws::S3::S3Errors error);
|
bool isNotFoundError(Aws::S3::S3Errors error);
|
||||||
|
|
||||||
|
@ -182,12 +182,8 @@ void WriteBufferFromS3::finalizeImpl()
|
|||||||
if (check_objects_after_upload)
|
if (check_objects_after_upload)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Checking object {} exists after upload", key);
|
LOG_TRACE(log, "Checking object {} exists after upload", key);
|
||||||
|
S3::checkObjectExists(*client_ptr, bucket, key, {}, /* for_disk_s3= */ write_settings.for_object_storage, "Immediately after upload");
|
||||||
auto [exists, error] = S3::checkObjectExists(*client_ptr, bucket, key, "", write_settings.for_object_storage);
|
LOG_TRACE(log, "Object {} exists after upload", key);
|
||||||
if (exists)
|
|
||||||
LOG_TRACE(log, "Object {} exists after upload", key);
|
|
||||||
else
|
|
||||||
throw S3Exception(fmt::format("Object {} from bucket {} disappeared immediately after upload, it's a bug in S3 or S3 API.", key, bucket), error.GetErrorType());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user