mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-15 12:14:18 +00:00
Corrections after review and some improvements.
This commit is contained in:
parent
727b70f032
commit
a5cbac555c
@ -1,22 +1,35 @@
|
||||
#include <IO/S3/S3Capabilities.h>
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
S3Capabilities::S3Capabilities(const S3Capabilities & src)
|
||||
: S3Capabilities(src.is_batch_delete_supported(), src.support_proxy)
|
||||
: S3Capabilities(src.isBatchDeleteSupported(), src.support_proxy)
|
||||
{
|
||||
}
|
||||
|
||||
std::optional<bool> S3Capabilities::is_batch_delete_supported() const
|
||||
std::optional<bool> S3Capabilities::isBatchDeleteSupported() const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
return support_batch_delete;
|
||||
}
|
||||
|
||||
void S3Capabilities::set_is_batch_delete_supported(std::optional<bool> support_batch_delete_)
|
||||
void S3Capabilities::setIsBatchDeleteSupported(bool support_batch_delete_)
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
if (support_batch_delete.has_value() && (support_batch_delete.value() != support_batch_delete_))
|
||||
{
|
||||
LOG_ERROR(getLogger("S3Capabilities"),
|
||||
"Got different results ({} vs {}) from checking if the cloud storage supports batch delete (DeleteObjects), "
|
||||
"the cloud storage API may be unstable",
|
||||
support_batch_delete.value(), support_batch_delete_);
|
||||
chassert(false && "Got different results from checking if the cloud storage supports batch delete");
|
||||
}
|
||||
|
||||
support_batch_delete = support_batch_delete_;
|
||||
}
|
||||
|
||||
|
@ -27,16 +27,17 @@ public:
|
||||
/// because looks like it misses some features:
|
||||
/// 1) batch delete (DeleteObjects)
|
||||
/// 2) upload part copy (UploadPartCopy)
|
||||
/// If `support_batch_delete` contains `std::nullopt` it means that it isn't clean yet if it's supported or not
|
||||
/// and should be detected from responses of the cloud storage.
|
||||
std::optional<bool> is_batch_delete_supported() const;
|
||||
void set_is_batch_delete_supported(std::optional<bool> support_batch_delete_);
|
||||
/// If `isBatchDeleteSupported()` returns `nullopt` it means that it isn't clear yet if it's supported or not
|
||||
/// and should be detected automatically from responses of the cloud storage.
|
||||
std::optional<bool> isBatchDeleteSupported() const;
|
||||
void setIsBatchDeleteSupported(bool support_batch_delete_);
|
||||
|
||||
/// Y.Cloud S3 implementation support proxy for connection
|
||||
const bool support_proxy{false};
|
||||
|
||||
private:
|
||||
/// `support_batch_delete` is guarded by mutex because function deleteFilesFromS3() can update this field from another thread.
|
||||
/// If `support_batch_delete == nullopt` that means it's not clear yet if it's supported or not.
|
||||
std::optional<bool> support_batch_delete TSA_GUARDED_BY(mutex);
|
||||
|
||||
mutable std::mutex mutex;
|
||||
|
@ -29,19 +29,21 @@ void deleteFileFromS3(
|
||||
size_t file_size_for_blob_storage_log,
|
||||
std::optional<ProfileEvents::Event> profile_event)
|
||||
{
|
||||
S3::DeleteObjectRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(key);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
|
||||
if (profile_event && *profile_event != ProfileEvents::S3DeleteObjects)
|
||||
ProfileEvents::increment(*profile_event);
|
||||
|
||||
auto log = getLogger("deleteFileFromS3");
|
||||
|
||||
S3::DeleteObjectRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(key);
|
||||
auto outcome = s3_client->DeleteObject(request);
|
||||
|
||||
auto log = getLogger("deleteFileFromS3");
|
||||
|
||||
if (blob_storage_log)
|
||||
{
|
||||
LOG_TRACE(log, "Writing Delete operation for blob {}", key);
|
||||
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete,
|
||||
bucket, key,
|
||||
local_path_for_blob_storage_log, file_size_for_blob_storage_log,
|
||||
@ -52,16 +54,22 @@ void deleteFileFromS3(
|
||||
LOG_TRACE(log, "No blob storage log, not writing blob {}", key);
|
||||
}
|
||||
|
||||
if (!outcome.IsSuccess() && (!if_exists || !S3::isNotFoundError(outcome.GetError().GetErrorType())))
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
LOG_INFO(log, "Object with path {} was removed from S3", key);
|
||||
}
|
||||
else if (if_exists && S3::isNotFoundError(outcome.GetError().GetErrorType()))
|
||||
{
|
||||
/// In this case even if absence of key may be ok for us, the log will be polluted with error messages from aws sdk.
|
||||
/// Looks like there is no way to suppress them.
|
||||
LOG_TRACE(log, "Object with path {} was skipped because it didn't exist", key);
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & err = outcome.GetError();
|
||||
throw S3Exception(err.GetErrorType(), "{} (Code: {}) while removing object with path {} from S3",
|
||||
err.GetMessage(), static_cast<size_t>(err.GetErrorType()), key);
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Object with path {} was removed from S3", key);
|
||||
}
|
||||
|
||||
|
||||
@ -70,8 +78,7 @@ void deleteFilesFromS3(
|
||||
const String & bucket,
|
||||
const Strings & keys,
|
||||
bool if_exists,
|
||||
std::optional<bool> is_batch_delete_supported,
|
||||
std::function<void(bool)> set_is_batch_delete_supported,
|
||||
S3Capabilities & s3_capabilities,
|
||||
size_t batch_size,
|
||||
BlobStorageLogWriterPtr blob_storage_log,
|
||||
const Strings & local_paths_for_blob_storage_log,
|
||||
@ -81,31 +88,22 @@ void deleteFilesFromS3(
|
||||
if (keys.empty())
|
||||
return; /// Nothing to delete.
|
||||
|
||||
String empty_string;
|
||||
|
||||
if (keys.size() == 1)
|
||||
/// We're trying batch delete (DeleteObjects) first.
|
||||
bool try_batch_delete = true;
|
||||
{
|
||||
/// We're deleting one file - there is no need for the batch delete.
|
||||
const String & local_path_for_blob_storage_log = !local_paths_for_blob_storage_log.empty() ? local_paths_for_blob_storage_log[0] : empty_string;
|
||||
size_t file_size_for_blob_storage_log = !file_sizes_for_blob_storage_log.empty() ? file_sizes_for_blob_storage_log[0] : 0;
|
||||
deleteFileFromS3(s3_client, bucket, keys[0], if_exists,
|
||||
blob_storage_log, local_path_for_blob_storage_log, file_size_for_blob_storage_log,
|
||||
profile_event);
|
||||
return;
|
||||
if (keys.size() == 1)
|
||||
try_batch_delete = false; /// We're deleting one file - there is no need for batch delete.
|
||||
else if (batch_size < 2)
|
||||
try_batch_delete = false; /// Can't do batch delete with such small batches.
|
||||
else if (auto support_batch_delete = s3_capabilities.isBatchDeleteSupported();
|
||||
support_batch_delete.has_value() && !support_batch_delete.value())
|
||||
try_batch_delete = false; /// Support for batch delete is disabled.
|
||||
}
|
||||
|
||||
auto log = getLogger("deleteFileFromS3");
|
||||
String empty_string;
|
||||
|
||||
/// We're trying the batch delete first.
|
||||
bool try_batch_requests = true;
|
||||
{
|
||||
if (is_batch_delete_supported.has_value() && !is_batch_delete_supported.value())
|
||||
try_batch_requests = false;
|
||||
else if (batch_size < 2)
|
||||
try_batch_requests = false;
|
||||
}
|
||||
|
||||
if (try_batch_requests)
|
||||
if (try_batch_delete)
|
||||
{
|
||||
bool need_retry_with_plain_delete_object = false;
|
||||
size_t current_position = 0;
|
||||
@ -128,14 +126,16 @@ void deleteFilesFromS3(
|
||||
|
||||
Aws::S3::Model::Delete delkeys;
|
||||
delkeys.SetObjects(current_chunk);
|
||||
delkeys.SetQuiet(true);
|
||||
|
||||
S3::DeleteObjectsRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetDelete(delkeys);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
|
||||
if (profile_event && *profile_event != ProfileEvents::S3DeleteObjects)
|
||||
ProfileEvents::increment(*profile_event);
|
||||
|
||||
S3::DeleteObjectsRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetDelete(delkeys);
|
||||
auto outcome = s3_client->DeleteObjects(request);
|
||||
|
||||
if (blob_storage_log)
|
||||
@ -159,44 +159,99 @@ void deleteFilesFromS3(
|
||||
LOG_TRACE(log, "No blob storage log, not writing blobs [{}]", comma_separated_keys);
|
||||
}
|
||||
|
||||
if (!is_batch_delete_supported.has_value())
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
if (!outcome.IsSuccess()
|
||||
&& (outcome.GetError().GetExceptionName() == "InvalidRequest"
|
||||
|| outcome.GetError().GetExceptionName() == "InvalidArgument"))
|
||||
/// DeleteObjects succeeded, that means some objects were removed (but maybe not all the objects).
|
||||
s3_capabilities.setIsBatchDeleteSupported(true);
|
||||
|
||||
const auto & errors = outcome.GetResult().GetErrors();
|
||||
if (errors.empty())
|
||||
{
|
||||
/// All the objects were removed.
|
||||
LOG_INFO(log, "Objects with paths [{}] were removed from S3", comma_separated_keys);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Mixed success/error response - some objects were removed, and some were not.
|
||||
/// We need to extract more detailed information from the outcome.
|
||||
std::unordered_set<std::string_view> removed_keys{keys.begin(), keys.end()};
|
||||
String not_found_keys;
|
||||
std::exception_ptr other_error;
|
||||
|
||||
for (const auto & err : errors)
|
||||
{
|
||||
removed_keys.erase(err.GetKey());
|
||||
auto error_type = static_cast<Aws::S3::S3Errors>(Aws::S3::S3ErrorMapper::GetErrorForName(err.GetCode().c_str()).GetErrorType());
|
||||
if (if_exists && S3::isNotFoundError(error_type))
|
||||
{
|
||||
if (not_found_keys.empty())
|
||||
not_found_keys += ", ";
|
||||
not_found_keys += err.GetKey();
|
||||
}
|
||||
else if (!other_error)
|
||||
{
|
||||
other_error = std::make_exception_ptr(
|
||||
S3Exception{error_type, "{} (Code: {}) while removing object with path {} from S3",
|
||||
err.GetMessage(), err.GetCode(), err.GetKey()});
|
||||
}
|
||||
}
|
||||
|
||||
if (!removed_keys.empty())
|
||||
{
|
||||
String removed_keys_comma_separated;
|
||||
for (const auto & key : removed_keys)
|
||||
{
|
||||
if (!removed_keys_comma_separated.empty())
|
||||
removed_keys_comma_separated += ", ";
|
||||
removed_keys_comma_separated += key;
|
||||
}
|
||||
LOG_INFO(log, "Objects with paths [{}] were removed from S3", removed_keys_comma_separated);
|
||||
}
|
||||
|
||||
if (!not_found_keys.empty())
|
||||
{
|
||||
/// In this case even if absence of key may be ok for us, the log will be polluted with error messages from aws sdk.
|
||||
/// Looks like there is no way to suppress them.
|
||||
LOG_TRACE(log, "Object with paths [{}] were skipped because they didn't exist", not_found_keys);
|
||||
}
|
||||
|
||||
if (other_error)
|
||||
std::rethrow_exception(other_error);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// DeleteObjects didn't succeed, that means either a) this operation isn't supported at all;
|
||||
/// or b) all the objects didn't exist; or c) some failure occurred.
|
||||
const auto & err = outcome.GetError();
|
||||
if (!s3_capabilities.isBatchDeleteSupported().has_value()
|
||||
&& ((err.GetExceptionName() == "InvalidRequest") || (err.GetExceptionName() == "InvalidArgument")
|
||||
|| (err.GetExceptionName() == "NotImplemented")))
|
||||
{
|
||||
const auto & err = outcome.GetError();
|
||||
LOG_TRACE(log, "DeleteObjects is not supported: {} (Code: {}). Retrying with plain DeleteObject.",
|
||||
err.GetMessage(), static_cast<size_t>(err.GetErrorType()));
|
||||
is_batch_delete_supported = false;
|
||||
if (set_is_batch_delete_supported)
|
||||
set_is_batch_delete_supported(*is_batch_delete_supported);
|
||||
s3_capabilities.setIsBatchDeleteSupported(false);
|
||||
need_retry_with_plain_delete_object = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (if_exists && S3::isNotFoundError(err.GetErrorType()))
|
||||
{
|
||||
LOG_TRACE(log, "Object with paths [{}] were skipped because they didn't exist", comma_separated_keys);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw S3Exception(err.GetErrorType(), "{} (Code: {}) while removing objects with paths [{}] from S3",
|
||||
err.GetMessage(), static_cast<size_t>(err.GetErrorType()), comma_separated_keys);
|
||||
}
|
||||
}
|
||||
|
||||
if (!outcome.IsSuccess() && (!if_exists || !S3::isNotFoundError(outcome.GetError().GetErrorType())))
|
||||
{
|
||||
/// In this case even if absence of key may be ok for us, the log will be polluted with error messages from aws sdk.
|
||||
/// Looks like there is no way to suppress them.
|
||||
const auto & err = outcome.GetError();
|
||||
throw S3Exception(err.GetErrorType(), "{} (Code: {}) while removing objects with paths [{}] from S3",
|
||||
err.GetMessage(), static_cast<size_t>(err.GetErrorType()), comma_separated_keys);
|
||||
}
|
||||
|
||||
is_batch_delete_supported = true;
|
||||
if (set_is_batch_delete_supported)
|
||||
set_is_batch_delete_supported(*is_batch_delete_supported);
|
||||
|
||||
LOG_DEBUG(log, "Objects with paths [{}] were removed from S3", comma_separated_keys);
|
||||
}
|
||||
|
||||
if (!need_retry_with_plain_delete_object)
|
||||
return;
|
||||
}
|
||||
|
||||
/// The batch delete isn't supported so we'll delete all files sequentially.
|
||||
/// Batch delete (DeleteObjects) isn't supported so we'll delete all the files sequentially.
|
||||
for (size_t i = 0; i != keys.size(); ++i)
|
||||
{
|
||||
const String & local_path_for_blob_storage_log = (i < local_paths_for_blob_storage_log.size()) ? local_paths_for_blob_storage_log[i] : empty_string;
|
||||
@ -208,28 +263,6 @@ void deleteFilesFromS3(
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void deleteFilesFromS3(
|
||||
const std::shared_ptr<const S3::Client> & s3_client,
|
||||
const String & bucket,
|
||||
const Strings & keys,
|
||||
bool if_exists,
|
||||
S3Capabilities & s3_capabilities,
|
||||
size_t batch_size,
|
||||
BlobStorageLogWriterPtr blob_storage_log,
|
||||
const Strings & local_paths_for_blob_storage_log,
|
||||
const std::vector<size_t> & file_sizes_for_blob_storage_log,
|
||||
std::optional<ProfileEvents::Event> profile_event)
|
||||
{
|
||||
std::optional<bool> is_batch_delete_supported = s3_capabilities.is_batch_delete_supported();
|
||||
|
||||
auto set_is_batch_delete_supported
|
||||
= [&](bool support_batch_delete_) { s3_capabilities.set_is_batch_delete_supported(support_batch_delete_); };
|
||||
|
||||
deleteFilesFromS3(s3_client, bucket, keys, if_exists, is_batch_delete_supported, set_is_batch_delete_supported, batch_size,
|
||||
blob_storage_log, local_paths_for_blob_storage_log, file_sizes_for_blob_storage_log, profile_event);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -7,7 +7,6 @@
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Core/Types.h>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -35,19 +34,6 @@ void deleteFileFromS3(
|
||||
std::optional<ProfileEvents::Event> profile_event = std::nullopt);
|
||||
|
||||
/// Deletes multiple files from S3 using batch requests when it's possible.
|
||||
void deleteFilesFromS3(
|
||||
const std::shared_ptr<const S3::Client> & s3_client,
|
||||
const String & bucket,
|
||||
const Strings & keys,
|
||||
bool if_exists = false,
|
||||
std::optional<bool> is_batch_delete_supported = std::nullopt,
|
||||
std::function<void(bool)> set_is_batch_delete_supported = nullptr,
|
||||
size_t batch_size = 1000,
|
||||
BlobStorageLogWriterPtr blob_storage_log = nullptr,
|
||||
const Strings & local_paths_for_blob_storage_log = {},
|
||||
const std::vector<size_t> & file_sizes_for_blob_storage_log = {},
|
||||
std::optional<ProfileEvents::Event> profile_event = std::nullopt);
|
||||
|
||||
void deleteFilesFromS3(
|
||||
const std::shared_ptr<const S3::Client> & s3_client,
|
||||
const String & bucket,
|
||||
|
@ -66,7 +66,8 @@ namespace
|
||||
|
||||
bool isNotFoundError(Aws::S3::S3Errors error)
|
||||
{
|
||||
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY;
|
||||
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY
|
||||
|| error == Aws::S3::S3Errors::NO_SUCH_BUCKET;
|
||||
}
|
||||
|
||||
ObjectInfo getObjectInfo(
|
||||
|
@ -32,6 +32,7 @@ def request(command, url, headers={}, data=None):
|
||||
|
||||
|
||||
class RequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
# GetObject
|
||||
def do_GET(self):
|
||||
if self.path == "/":
|
||||
self.send_response(200)
|
||||
@ -41,12 +42,15 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
else:
|
||||
self.do_HEAD()
|
||||
|
||||
# PutObject
|
||||
def do_PUT(self):
|
||||
self.do_HEAD()
|
||||
|
||||
# DeleteObjects (/root?delete)
|
||||
def do_POST(self):
|
||||
self.do_HEAD()
|
||||
|
||||
# DeleteObject
|
||||
def do_DELETE(self):
|
||||
self.do_HEAD()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user