mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-15 12:14:18 +00:00
Merge pull request #70786 from vitlibar/improve-checking-capability-for-batch-delete-from-s3
Improve checking capability for batch delete from S3
This commit is contained in:
commit
d1e8358bcb
@ -10,6 +10,7 @@
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/HTTPHeaderEntries.h>
|
||||
#include <IO/S3/copyS3File.h>
|
||||
#include <IO/S3/deleteFileFromS3.h>
|
||||
#include <IO/S3/Client.h>
|
||||
#include <IO/S3/Credentials.h>
|
||||
#include <Disks/IDisk.h>
|
||||
@ -117,12 +118,6 @@ namespace
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
return outcome.GetResult().GetContents();
|
||||
}
|
||||
|
||||
bool isNotFoundError(Aws::S3::S3Errors error)
|
||||
{
|
||||
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND
|
||||
|| error == Aws::S3::S3Errors::NO_SUCH_KEY;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -236,6 +231,7 @@ BackupWriterS3::BackupWriterS3(
|
||||
: BackupWriterDefault(read_settings_, write_settings_, getLogger("BackupWriterS3"))
|
||||
, s3_uri(s3_uri_)
|
||||
, data_source_description{DataSourceType::ObjectStorage, ObjectStorageType::S3, MetadataStorageType::None, s3_uri.endpoint, false, false}
|
||||
, s3_capabilities(getCapabilitiesFromConfig(context_->getConfigRef(), "s3"))
|
||||
{
|
||||
s3_settings.loadFromConfig(context_->getConfigRef(), "s3", context_->getSettingsRef());
|
||||
|
||||
@ -358,92 +354,22 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
|
||||
|
||||
void BackupWriterS3::removeFile(const String & file_name)
|
||||
{
|
||||
S3::DeleteObjectRequest request;
|
||||
request.SetBucket(s3_uri.bucket);
|
||||
auto key = fs::path(s3_uri.key) / file_name;
|
||||
request.SetKey(key);
|
||||
|
||||
auto outcome = client->DeleteObject(request);
|
||||
|
||||
if (blob_storage_log)
|
||||
{
|
||||
blob_storage_log->addEvent(
|
||||
BlobStorageLogElement::EventType::Delete,
|
||||
s3_uri.bucket, key, /* local_path */ "", /* data_size */ 0,
|
||||
outcome.IsSuccess() ? nullptr : &outcome.GetError());
|
||||
}
|
||||
|
||||
if (!outcome.IsSuccess() && !isNotFoundError(outcome.GetError().GetErrorType()))
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
deleteFileFromS3(client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, /* if_exists = */ false,
|
||||
blob_storage_log);
|
||||
}
|
||||
|
||||
void BackupWriterS3::removeFiles(const Strings & file_names)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!supports_batch_delete.has_value() || supports_batch_delete.value() == true)
|
||||
{
|
||||
removeFilesBatch(file_names);
|
||||
supports_batch_delete = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
for (const auto & file_name : file_names)
|
||||
removeFile(file_name);
|
||||
}
|
||||
}
|
||||
catch (const Exception &)
|
||||
{
|
||||
if (!supports_batch_delete.has_value())
|
||||
{
|
||||
supports_batch_delete = false;
|
||||
LOG_TRACE(log, "DeleteObjects is not supported. Retrying with plain DeleteObject.");
|
||||
Strings keys;
|
||||
keys.reserve(file_names.size());
|
||||
for (const String & file_name : file_names)
|
||||
keys.push_back(fs::path(s3_uri.key) / file_name);
|
||||
|
||||
for (const auto & file_name : file_names)
|
||||
removeFile(file_name);
|
||||
}
|
||||
else
|
||||
throw;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void BackupWriterS3::removeFilesBatch(const Strings & file_names)
|
||||
{
|
||||
/// One call of DeleteObjects() cannot remove more than 1000 keys.
|
||||
size_t chunk_size_limit = 1000;
|
||||
size_t batch_size = 1000;
|
||||
|
||||
size_t current_position = 0;
|
||||
while (current_position < file_names.size())
|
||||
{
|
||||
std::vector<Aws::S3::Model::ObjectIdentifier> current_chunk;
|
||||
for (; current_position < file_names.size() && current_chunk.size() < chunk_size_limit; ++current_position)
|
||||
{
|
||||
Aws::S3::Model::ObjectIdentifier obj;
|
||||
obj.SetKey(fs::path(s3_uri.key) / file_names[current_position]);
|
||||
current_chunk.push_back(obj);
|
||||
}
|
||||
|
||||
Aws::S3::Model::Delete delkeys;
|
||||
delkeys.SetObjects(current_chunk);
|
||||
S3::DeleteObjectsRequest request;
|
||||
request.SetBucket(s3_uri.bucket);
|
||||
request.SetDelete(delkeys);
|
||||
|
||||
auto outcome = client->DeleteObjects(request);
|
||||
|
||||
if (blob_storage_log)
|
||||
{
|
||||
const auto * outcome_error = outcome.IsSuccess() ? nullptr : &outcome.GetError();
|
||||
auto time_now = std::chrono::system_clock::now();
|
||||
for (const auto & obj : current_chunk)
|
||||
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete, s3_uri.bucket, obj.GetKey(),
|
||||
/* local_path */ "", /* data_size */ 0, outcome_error, time_now);
|
||||
}
|
||||
|
||||
if (!outcome.IsSuccess() && !isNotFoundError(outcome.GetError().GetErrorType()))
|
||||
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
|
||||
}
|
||||
deleteFilesFromS3(client, s3_uri.bucket, keys, /* if_exists = */ false,
|
||||
s3_capabilities, batch_size, blob_storage_log);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -10,6 +10,7 @@
|
||||
#include <IO/S3Settings.h>
|
||||
#include <Interpreters/Context_fwd.h>
|
||||
#include <IO/S3/BlobStorageLogWriter.h>
|
||||
#include <IO/S3/S3Capabilities.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -76,14 +77,12 @@ public:
|
||||
|
||||
private:
|
||||
std::unique_ptr<ReadBuffer> readFile(const String & file_name, size_t expected_file_size) override;
|
||||
void removeFilesBatch(const Strings & file_names);
|
||||
|
||||
const S3::URI s3_uri;
|
||||
const DataSourceDescription data_source_description;
|
||||
S3Settings s3_settings;
|
||||
std::shared_ptr<S3::Client> client;
|
||||
std::optional<bool> supports_batch_delete;
|
||||
|
||||
S3Capabilities s3_capabilities;
|
||||
BlobStorageLogWriterPtr blob_storage_log;
|
||||
};
|
||||
|
||||
|
@ -158,21 +158,6 @@ S3::URI getS3URI(const Poco::Util::AbstractConfiguration & config, const std::st
|
||||
return uri;
|
||||
}
|
||||
|
||||
void checkS3Capabilities(
|
||||
S3ObjectStorage & storage, const S3Capabilities s3_capabilities, const String & name)
|
||||
{
|
||||
/// If `support_batch_delete` is turned on (default), check and possibly switch it off.
|
||||
if (s3_capabilities.support_batch_delete && !checkBatchRemove(storage))
|
||||
{
|
||||
LOG_WARNING(
|
||||
getLogger("S3ObjectStorage"),
|
||||
"Storage for disk {} does not support batch delete operations, "
|
||||
"so `s3_capabilities.support_batch_delete` was automatically turned off during the access check. "
|
||||
"To remove this message set `s3_capabilities.support_batch_delete` for the disk to `false`.",
|
||||
name);
|
||||
storage.setCapabilitiesSupportBatchDelete(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static std::string getEndpoint(
|
||||
@ -192,7 +177,7 @@ void registerS3ObjectStorage(ObjectStorageFactory & factory)
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
const ContextPtr & context,
|
||||
bool skip_access_check) -> ObjectStoragePtr
|
||||
bool /* skip_access_check */) -> ObjectStoragePtr
|
||||
{
|
||||
auto uri = getS3URI(config, config_prefix, context);
|
||||
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
|
||||
@ -204,10 +189,6 @@ void registerS3ObjectStorage(ObjectStorageFactory & factory)
|
||||
auto object_storage = createObjectStorage<S3ObjectStorage>(
|
||||
ObjectStorageType::S3, config, config_prefix, std::move(client), std::move(settings), uri, s3_capabilities, key_generator, name);
|
||||
|
||||
/// NOTE: should we still perform this check for clickhouse-disks?
|
||||
if (!skip_access_check)
|
||||
checkS3Capabilities(*dynamic_cast<S3ObjectStorage *>(object_storage.get()), s3_capabilities, name);
|
||||
|
||||
return object_storage;
|
||||
});
|
||||
}
|
||||
@ -221,7 +202,7 @@ void registerS3PlainObjectStorage(ObjectStorageFactory & factory)
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
const ContextPtr & context,
|
||||
bool skip_access_check) -> ObjectStoragePtr
|
||||
bool /* skip_access_check */) -> ObjectStoragePtr
|
||||
{
|
||||
/// send_metadata changes the filenames (includes revision), while
|
||||
/// s3_plain do not care about this, and expect that the file name
|
||||
@ -241,10 +222,6 @@ void registerS3PlainObjectStorage(ObjectStorageFactory & factory)
|
||||
auto object_storage = std::make_shared<PlainObjectStorage<S3ObjectStorage>>(
|
||||
std::move(client), std::move(settings), uri, s3_capabilities, key_generator, name);
|
||||
|
||||
/// NOTE: should we still perform this check for clickhouse-disks?
|
||||
if (!skip_access_check)
|
||||
checkS3Capabilities(*dynamic_cast<S3ObjectStorage *>(object_storage.get()), s3_capabilities, name);
|
||||
|
||||
return object_storage;
|
||||
});
|
||||
}
|
||||
@ -259,7 +236,7 @@ void registerS3PlainRewritableObjectStorage(ObjectStorageFactory & factory)
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const std::string & config_prefix,
|
||||
const ContextPtr & context,
|
||||
bool skip_access_check) -> ObjectStoragePtr
|
||||
bool /* skip_access_check */) -> ObjectStoragePtr
|
||||
{
|
||||
/// send_metadata changes the filenames (includes revision), while
|
||||
/// s3_plain_rewritable does not support file renaming.
|
||||
@ -277,10 +254,6 @@ void registerS3PlainRewritableObjectStorage(ObjectStorageFactory & factory)
|
||||
auto object_storage = std::make_shared<PlainRewritableObjectStorage<S3ObjectStorage>>(
|
||||
std::move(metadata_storage_metrics), std::move(client), std::move(settings), uri, s3_capabilities, key_generator, name);
|
||||
|
||||
/// NOTE: should we still perform this check for clickhouse-disks?
|
||||
if (!skip_access_check)
|
||||
checkS3Capabilities(*dynamic_cast<S3ObjectStorage *>(object_storage.get()), s3_capabilities, name);
|
||||
|
||||
return object_storage;
|
||||
});
|
||||
}
|
||||
|
@ -2,8 +2,6 @@
|
||||
|
||||
#if USE_AWS_S3
|
||||
#include <Disks/ObjectStorages/DiskObjectStorageMetadata.h>
|
||||
#include <Disks/ObjectStorages/S3/S3ObjectStorage.h>
|
||||
#include <Core/ServerUUID.h>
|
||||
#include <IO/S3/URI.h>
|
||||
|
||||
namespace DB
|
||||
@ -11,7 +9,6 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
ObjectStorageKeysGeneratorPtr getKeyGenerator(
|
||||
@ -65,58 +62,6 @@ ObjectStorageKeysGeneratorPtr getKeyGenerator(
|
||||
return createObjectStorageKeysGeneratorByTemplate(object_key_template);
|
||||
}
|
||||
|
||||
static String getServerUUID()
|
||||
{
|
||||
UUID server_uuid = ServerUUID::get();
|
||||
if (server_uuid == UUIDHelpers::Nil)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Server UUID is not initialized");
|
||||
return toString(server_uuid);
|
||||
}
|
||||
|
||||
bool checkBatchRemove(S3ObjectStorage & storage)
|
||||
{
|
||||
/// NOTE: Here we are going to write and later drop some key.
|
||||
/// We are using generateObjectKeyForPath() which returns random object key.
|
||||
/// That generated key is placed in a right directory where we should have write access.
|
||||
const String path = fmt::format("clickhouse_remove_objects_capability_{}", getServerUUID());
|
||||
const auto key = storage.generateObjectKeyForPath(path, {} /* key_prefix */);
|
||||
StoredObject object(key.serialize(), path);
|
||||
try
|
||||
{
|
||||
auto file = storage.writeObject(object, WriteMode::Rewrite);
|
||||
file->write("test", 4);
|
||||
file->finalize();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
try
|
||||
{
|
||||
storage.removeObject(object);
|
||||
}
|
||||
catch (...) // NOLINT(bugprone-empty-catch)
|
||||
{
|
||||
}
|
||||
/// We don't have write access, therefore no information about batch remove.
|
||||
return true;
|
||||
}
|
||||
try
|
||||
{
|
||||
/// Uses `DeleteObjects` request (batch delete).
|
||||
storage.removeObjects({object});
|
||||
return true;
|
||||
}
|
||||
catch (const Exception &)
|
||||
{
|
||||
try
|
||||
{
|
||||
storage.removeObject(object);
|
||||
}
|
||||
catch (...) // NOLINT(bugprone-empty-catch)
|
||||
{
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -16,9 +16,6 @@ ObjectStorageKeysGeneratorPtr getKeyGenerator(
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
const String & config_prefix);
|
||||
|
||||
class S3ObjectStorage;
|
||||
bool checkBatchRemove(S3ObjectStorage & storage);
|
||||
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -1,15 +0,0 @@
|
||||
#include <Disks/ObjectStorages/S3/S3Capabilities.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
S3Capabilities getCapabilitiesFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
|
||||
{
|
||||
return S3Capabilities
|
||||
{
|
||||
.support_batch_delete = config.getBool(config_prefix + ".support_batch_delete", true),
|
||||
.support_proxy = config.getBool(config_prefix + ".support_proxy", config.has(config_prefix + ".proxy")),
|
||||
};
|
||||
}
|
||||
|
||||
}
|
@ -1,26 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <string>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Supported/unsupported features by different S3 implementations
|
||||
/// Can be useful only for almost compatible with AWS S3 versions.
|
||||
struct S3Capabilities
|
||||
{
|
||||
/// Google S3 implementation doesn't support batch delete
|
||||
/// TODO: possibly we have to use Google SDK https://github.com/googleapis/google-cloud-cpp/tree/main/google/cloud/storage
|
||||
/// because looks like it misses some features:
|
||||
/// 1) batch delete (DeleteObjects)
|
||||
/// 2) upload part copy (UploadPartCopy)
|
||||
bool support_batch_delete{true};
|
||||
|
||||
/// Y.Cloud S3 implementation support proxy for connection
|
||||
bool support_proxy{false};
|
||||
};
|
||||
|
||||
S3Capabilities getCapabilitiesFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
|
||||
|
||||
}
|
@ -13,6 +13,7 @@
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/S3/getObjectInfo.h>
|
||||
#include <IO/S3/copyS3File.h>
|
||||
#include <IO/S3/deleteFileFromS3.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Common/threadPoolCallbackRunner.h>
|
||||
#include <Core/Settings.h>
|
||||
@ -29,7 +30,6 @@
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event S3DeleteObjects;
|
||||
extern const Event S3ListObjects;
|
||||
extern const Event DiskS3DeleteObjects;
|
||||
extern const Event DiskS3ListObjects;
|
||||
@ -73,20 +73,6 @@ void throwIfError(const Aws::Utils::Outcome<Result, Error> & response)
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Result, typename Error>
|
||||
void throwIfUnexpectedError(const Aws::Utils::Outcome<Result, Error> & response, bool if_exists)
|
||||
{
|
||||
/// In this case even if absence of key may be ok for us,
|
||||
/// the log will be polluted with error messages from aws sdk.
|
||||
/// Looks like there is no way to suppress them.
|
||||
|
||||
if (!response.IsSuccess() && (!if_exists || !S3::isNotFoundError(response.GetError().GetErrorType())))
|
||||
{
|
||||
const auto & err = response.GetError();
|
||||
throw S3Exception(err.GetErrorType(), "{} (Code: {})", err.GetMessage(), static_cast<size_t>(err.GetErrorType()));
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Result, typename Error>
|
||||
void logIfError(const Aws::Utils::Outcome<Result, Error> & response, std::function<String()> && msg)
|
||||
{
|
||||
@ -300,21 +286,11 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
|
||||
|
||||
void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exists)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects);
|
||||
auto blob_storage_log = BlobStorageLogWriter::create(disk_name);
|
||||
|
||||
S3::DeleteObjectRequest request;
|
||||
request.SetBucket(uri.bucket);
|
||||
request.SetKey(object.remote_path);
|
||||
auto outcome = client.get()->DeleteObject(request);
|
||||
if (auto blob_storage_log = BlobStorageLogWriter::create(disk_name))
|
||||
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete,
|
||||
uri.bucket, object.remote_path, object.local_path, object.bytes_size,
|
||||
outcome.IsSuccess() ? nullptr : &outcome.GetError());
|
||||
|
||||
throwIfUnexpectedError(outcome, if_exists);
|
||||
|
||||
LOG_DEBUG(log, "Object with path {} was removed from S3", object.remote_path);
|
||||
deleteFileFromS3(client.get(), uri.bucket, object.remote_path, if_exists,
|
||||
blob_storage_log, object.local_path, object.bytes_size,
|
||||
ProfileEvents::DiskS3DeleteObjects);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_exists)
|
||||
@ -322,59 +298,31 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
|
||||
if (objects.empty())
|
||||
return;
|
||||
|
||||
if (!s3_capabilities.support_batch_delete)
|
||||
Strings keys;
|
||||
keys.reserve(objects.size());
|
||||
for (const auto & object : objects)
|
||||
keys.push_back(object.remote_path);
|
||||
|
||||
auto blob_storage_log = BlobStorageLogWriter::create(disk_name);
|
||||
Strings local_paths_for_blob_storage_log;
|
||||
std::vector<size_t> file_sizes_for_blob_storage_log;
|
||||
if (blob_storage_log)
|
||||
{
|
||||
local_paths_for_blob_storage_log.reserve(objects.size());
|
||||
file_sizes_for_blob_storage_log.reserve(objects.size());
|
||||
for (const auto & object : objects)
|
||||
removeObjectImpl(object, if_exists);
|
||||
}
|
||||
else
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
|
||||
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
|
||||
size_t current_position = 0;
|
||||
|
||||
auto blob_storage_log = BlobStorageLogWriter::create(disk_name);
|
||||
while (current_position < objects.size())
|
||||
{
|
||||
std::vector<Aws::S3::Model::ObjectIdentifier> current_chunk;
|
||||
String keys;
|
||||
size_t first_position = current_position;
|
||||
for (; current_position < objects.size() && current_chunk.size() < chunk_size_limit; ++current_position)
|
||||
{
|
||||
Aws::S3::Model::ObjectIdentifier obj;
|
||||
obj.SetKey(objects[current_position].remote_path);
|
||||
current_chunk.push_back(obj);
|
||||
|
||||
if (!keys.empty())
|
||||
keys += ", ";
|
||||
keys += objects[current_position].remote_path;
|
||||
}
|
||||
|
||||
Aws::S3::Model::Delete delkeys;
|
||||
delkeys.SetObjects(current_chunk);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects);
|
||||
S3::DeleteObjectsRequest request;
|
||||
request.SetBucket(uri.bucket);
|
||||
request.SetDelete(delkeys);
|
||||
auto outcome = client.get()->DeleteObjects(request);
|
||||
|
||||
if (blob_storage_log)
|
||||
{
|
||||
const auto * outcome_error = outcome.IsSuccess() ? nullptr : &outcome.GetError();
|
||||
auto time_now = std::chrono::system_clock::now();
|
||||
for (size_t i = first_position; i < current_position; ++i)
|
||||
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete,
|
||||
uri.bucket, objects[i].remote_path, objects[i].local_path, objects[i].bytes_size,
|
||||
outcome_error, time_now);
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Objects with paths [{}] were removed from S3", keys);
|
||||
throwIfUnexpectedError(outcome, if_exists);
|
||||
local_paths_for_blob_storage_log.push_back(object.local_path);
|
||||
file_sizes_for_blob_storage_log.push_back(object.bytes_size);
|
||||
}
|
||||
}
|
||||
|
||||
auto settings_ptr = s3_settings.get();
|
||||
|
||||
deleteFilesFromS3(client.get(), uri.bucket, keys, if_exists,
|
||||
s3_capabilities, settings_ptr->objects_chunk_size_to_delete,
|
||||
blob_storage_log, local_paths_for_blob_storage_log, file_sizes_for_blob_storage_log,
|
||||
ProfileEvents::DiskS3DeleteObjects);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::removeObject(const StoredObject & object)
|
||||
|
@ -5,8 +5,8 @@
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <Disks/ObjectStorages/IObjectStorage.h>
|
||||
#include <Disks/ObjectStorages/S3/S3Capabilities.h>
|
||||
#include <memory>
|
||||
#include <IO/S3/S3Capabilities.h>
|
||||
#include <IO/S3Settings.h>
|
||||
#include <Common/MultiVersion.h>
|
||||
#include <Common/ObjectStorageKeyGenerator.h>
|
||||
@ -148,8 +148,6 @@ public:
|
||||
|
||||
bool isRemote() const override { return true; }
|
||||
|
||||
void setCapabilitiesSupportBatchDelete(bool value) { s3_capabilities.support_batch_delete = value; }
|
||||
|
||||
std::unique_ptr<IObjectStorage> cloneObjectStorage(
|
||||
const std::string & new_namespace,
|
||||
const Poco::Util::AbstractConfiguration & config,
|
||||
|
47
src/IO/S3/S3Capabilities.cpp
Normal file
47
src/IO/S3/S3Capabilities.cpp
Normal file
@ -0,0 +1,47 @@
|
||||
#include <IO/S3/S3Capabilities.h>
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
S3Capabilities::S3Capabilities(const S3Capabilities & src)
|
||||
: S3Capabilities(src.isBatchDeleteSupported(), src.support_proxy)
|
||||
{
|
||||
}
|
||||
|
||||
std::optional<bool> S3Capabilities::isBatchDeleteSupported() const
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
return support_batch_delete;
|
||||
}
|
||||
|
||||
void S3Capabilities::setIsBatchDeleteSupported(bool support_batch_delete_)
|
||||
{
|
||||
std::lock_guard lock{mutex};
|
||||
|
||||
if (support_batch_delete.has_value() && (support_batch_delete.value() != support_batch_delete_))
|
||||
{
|
||||
LOG_ERROR(getLogger("S3Capabilities"),
|
||||
"Got different results ({} vs {}) from checking if the cloud storage supports batch delete (DeleteObjects), "
|
||||
"the cloud storage API may be unstable",
|
||||
support_batch_delete.value(), support_batch_delete_);
|
||||
chassert(false && "Got different results from checking if the cloud storage supports batch delete");
|
||||
}
|
||||
|
||||
support_batch_delete = support_batch_delete_;
|
||||
}
|
||||
|
||||
S3Capabilities getCapabilitiesFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
|
||||
{
|
||||
std::optional<bool> support_batch_delete;
|
||||
if (config.has(config_prefix + ".support_batch_delete"))
|
||||
support_batch_delete = config.getBool(config_prefix + ".support_batch_delete");
|
||||
|
||||
bool support_proxy = config.getBool(config_prefix + ".support_proxy", config.has(config_prefix + ".proxy"));
|
||||
|
||||
return S3Capabilities{support_batch_delete, support_proxy};
|
||||
}
|
||||
|
||||
}
|
48
src/IO/S3/S3Capabilities.h
Normal file
48
src/IO/S3/S3Capabilities.h
Normal file
@ -0,0 +1,48 @@
|
||||
#pragma once
|
||||
|
||||
#include <base/defines.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <mutex>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Supported/unsupported features by different S3 implementations
|
||||
/// Can be useful only for almost compatible with AWS S3 versions.
|
||||
class S3Capabilities
|
||||
{
|
||||
public:
|
||||
explicit S3Capabilities(std::optional<bool> support_batch_delete_ = {}, bool support_proxy_ = false)
|
||||
: support_proxy(support_proxy_), support_batch_delete(support_batch_delete_)
|
||||
{
|
||||
}
|
||||
|
||||
S3Capabilities(const S3Capabilities & src);
|
||||
|
||||
/// Google S3 implementation doesn't support batch delete
|
||||
/// TODO: possibly we have to use Google SDK https://github.com/googleapis/google-cloud-cpp/tree/main/google/cloud/storage
|
||||
/// because looks like it misses some features:
|
||||
/// 1) batch delete (DeleteObjects)
|
||||
/// 2) upload part copy (UploadPartCopy)
|
||||
/// If `isBatchDeleteSupported()` returns `nullopt` it means that it isn't clear yet if it's supported or not
|
||||
/// and should be detected automatically from responses of the cloud storage.
|
||||
std::optional<bool> isBatchDeleteSupported() const;
|
||||
void setIsBatchDeleteSupported(bool support_batch_delete_);
|
||||
|
||||
/// Y.Cloud S3 implementation support proxy for connection
|
||||
const bool support_proxy{false};
|
||||
|
||||
private:
|
||||
/// `support_batch_delete` is guarded by mutex because function deleteFilesFromS3() can update this field from another thread.
|
||||
/// If `support_batch_delete == nullopt` that means it's not clear yet if it's supported or not.
|
||||
std::optional<bool> support_batch_delete TSA_GUARDED_BY(mutex);
|
||||
|
||||
mutable std::mutex mutex;
|
||||
};
|
||||
|
||||
S3Capabilities getCapabilitiesFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
|
||||
|
||||
}
|
274
src/IO/S3/deleteFileFromS3.cpp
Normal file
274
src/IO/S3/deleteFileFromS3.cpp
Normal file
@ -0,0 +1,274 @@
|
||||
#include <IO/S3/deleteFileFromS3.h>
|
||||
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <Common/logger_useful.h>
|
||||
#include <IO/S3/Client.h>
|
||||
#include <IO/S3/Requests.h>
|
||||
#include <IO/S3/BlobStorageLogWriter.h>
|
||||
#include <IO/S3/S3Capabilities.h>
|
||||
#include <IO/S3/getObjectInfo.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event S3DeleteObjects;
|
||||
}
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
void deleteFileFromS3(
|
||||
const std::shared_ptr<const S3::Client> & s3_client,
|
||||
const String & bucket,
|
||||
const String & key,
|
||||
bool if_exists,
|
||||
BlobStorageLogWriterPtr blob_storage_log,
|
||||
const String & local_path_for_blob_storage_log,
|
||||
size_t file_size_for_blob_storage_log,
|
||||
std::optional<ProfileEvents::Event> profile_event)
|
||||
{
|
||||
S3::DeleteObjectRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(key);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
|
||||
if (profile_event && *profile_event != ProfileEvents::S3DeleteObjects)
|
||||
ProfileEvents::increment(*profile_event);
|
||||
|
||||
auto outcome = s3_client->DeleteObject(request);
|
||||
|
||||
auto log = getLogger("deleteFileFromS3");
|
||||
|
||||
if (blob_storage_log)
|
||||
{
|
||||
LOG_TRACE(log, "Writing Delete operation for blob {}", key);
|
||||
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete,
|
||||
bucket, key,
|
||||
local_path_for_blob_storage_log, file_size_for_blob_storage_log,
|
||||
outcome.IsSuccess() ? nullptr : &outcome.GetError());
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(log, "No blob storage log, not writing blob {}", key);
|
||||
}
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
LOG_INFO(log, "Object with path {} was removed from S3", key);
|
||||
}
|
||||
else if (if_exists && S3::isNotFoundError(outcome.GetError().GetErrorType()))
|
||||
{
|
||||
/// In this case even if absence of key may be ok for us, the log will be polluted with error messages from aws sdk.
|
||||
/// Looks like there is no way to suppress them.
|
||||
LOG_TRACE(log, "Object with path {} was skipped because it didn't exist", key);
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto & err = outcome.GetError();
|
||||
throw S3Exception(err.GetErrorType(), "{} (Code: {}) while removing object with path {} from S3",
|
||||
err.GetMessage(), static_cast<size_t>(err.GetErrorType()), key);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void deleteFilesFromS3(
|
||||
const std::shared_ptr<const S3::Client> & s3_client,
|
||||
const String & bucket,
|
||||
const Strings & keys,
|
||||
bool if_exists,
|
||||
S3Capabilities & s3_capabilities,
|
||||
size_t batch_size,
|
||||
BlobStorageLogWriterPtr blob_storage_log,
|
||||
const Strings & local_paths_for_blob_storage_log,
|
||||
const std::vector<size_t> & file_sizes_for_blob_storage_log,
|
||||
std::optional<ProfileEvents::Event> profile_event)
|
||||
{
|
||||
chassert(local_paths_for_blob_storage_log.empty() || (local_paths_for_blob_storage_log.size() == keys.size()));
|
||||
chassert(file_sizes_for_blob_storage_log.empty() || (file_sizes_for_blob_storage_log.size() == keys.size()));
|
||||
|
||||
if (keys.empty())
|
||||
return; /// Nothing to delete.
|
||||
|
||||
/// We're trying batch delete (DeleteObjects) first.
|
||||
bool try_batch_delete = true;
|
||||
{
|
||||
if (keys.size() == 1)
|
||||
try_batch_delete = false; /// We're deleting one file - there is no need for batch delete.
|
||||
else if (batch_size < 2)
|
||||
try_batch_delete = false; /// Can't do batch delete with such small batches.
|
||||
else if (auto support_batch_delete = s3_capabilities.isBatchDeleteSupported();
|
||||
support_batch_delete.has_value() && !support_batch_delete.value())
|
||||
try_batch_delete = false; /// Support for batch delete is disabled.
|
||||
}
|
||||
|
||||
auto log = getLogger("deleteFileFromS3");
|
||||
const String empty_string;
|
||||
|
||||
if (try_batch_delete)
|
||||
{
|
||||
bool need_retry_with_plain_delete_object = false;
|
||||
size_t current_position = 0;
|
||||
|
||||
while (current_position < keys.size())
|
||||
{
|
||||
std::vector<Aws::S3::Model::ObjectIdentifier> current_chunk;
|
||||
String comma_separated_keys;
|
||||
size_t first_position = current_position;
|
||||
for (; current_position < keys.size() && current_chunk.size() < batch_size; ++current_position)
|
||||
{
|
||||
Aws::S3::Model::ObjectIdentifier obj;
|
||||
obj.SetKey(keys[current_position]);
|
||||
current_chunk.push_back(obj);
|
||||
|
||||
if (!comma_separated_keys.empty())
|
||||
comma_separated_keys += ", ";
|
||||
comma_separated_keys += keys[current_position];
|
||||
}
|
||||
|
||||
Aws::S3::Model::Delete delkeys;
|
||||
delkeys.SetObjects(current_chunk);
|
||||
delkeys.SetQuiet(true);
|
||||
|
||||
S3::DeleteObjectsRequest request;
|
||||
request.SetBucket(bucket);
|
||||
request.SetDelete(delkeys);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
|
||||
if (profile_event && *profile_event != ProfileEvents::S3DeleteObjects)
|
||||
ProfileEvents::increment(*profile_event);
|
||||
|
||||
auto outcome = s3_client->DeleteObjects(request);
|
||||
|
||||
if (blob_storage_log)
|
||||
{
|
||||
const auto * outcome_error = outcome.IsSuccess() ? nullptr : &outcome.GetError();
|
||||
auto time_now = std::chrono::system_clock::now();
|
||||
LOG_TRACE(log, "Writing Delete operation for blobs [{}]", comma_separated_keys);
|
||||
for (size_t i = first_position; i < current_position; ++i)
|
||||
{
|
||||
const String & local_path_for_blob_storage_log = (i < local_paths_for_blob_storage_log.size()) ? local_paths_for_blob_storage_log[i] : empty_string;
|
||||
size_t file_size_for_blob_storage_log = (i < file_sizes_for_blob_storage_log.size()) ? file_sizes_for_blob_storage_log[i] : 0;
|
||||
|
||||
blob_storage_log->addEvent(BlobStorageLogElement::EventType::Delete,
|
||||
bucket, keys[i],
|
||||
local_path_for_blob_storage_log, file_size_for_blob_storage_log,
|
||||
outcome_error, time_now);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(log, "No blob storage log, not writing blobs [{}]", comma_separated_keys);
|
||||
}
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
{
|
||||
/// DeleteObjects succeeded, that means some objects were removed (but maybe not all the objects).
|
||||
/// Multiple threads can call deleteFilesFromS3() with a reference to the same `s3_capabilities`,
|
||||
/// and the following line doesn't cause a race because `s3_capabilities` is protected with mutex.
|
||||
s3_capabilities.setIsBatchDeleteSupported(true);
|
||||
|
||||
const auto & errors = outcome.GetResult().GetErrors();
|
||||
if (errors.empty())
|
||||
{
|
||||
/// All the objects were removed.
|
||||
LOG_INFO(log, "Objects with paths [{}] were removed from S3", comma_separated_keys);
|
||||
}
|
||||
else
|
||||
{
|
||||
/// Mixed success/error response - some objects were removed, and some were not.
|
||||
/// We need to extract more detailed information from the outcome.
|
||||
std::unordered_set<std::string_view> removed_keys{keys.begin(), keys.end()};
|
||||
String not_found_keys;
|
||||
std::exception_ptr other_error;
|
||||
|
||||
for (const auto & err : errors)
|
||||
{
|
||||
removed_keys.erase(err.GetKey());
|
||||
auto error_type = static_cast<Aws::S3::S3Errors>(Aws::S3::S3ErrorMapper::GetErrorForName(err.GetCode().c_str()).GetErrorType());
|
||||
if (if_exists && S3::isNotFoundError(error_type))
|
||||
{
|
||||
if (not_found_keys.empty())
|
||||
not_found_keys += ", ";
|
||||
not_found_keys += err.GetKey();
|
||||
}
|
||||
else if (!other_error)
|
||||
{
|
||||
other_error = std::make_exception_ptr(
|
||||
S3Exception{error_type, "{} (Code: {}) while removing object with path {} from S3",
|
||||
err.GetMessage(), err.GetCode(), err.GetKey()});
|
||||
}
|
||||
}
|
||||
|
||||
if (!removed_keys.empty())
|
||||
{
|
||||
String removed_keys_comma_separated;
|
||||
for (const auto & key : removed_keys)
|
||||
{
|
||||
if (!removed_keys_comma_separated.empty())
|
||||
removed_keys_comma_separated += ", ";
|
||||
removed_keys_comma_separated += key;
|
||||
}
|
||||
LOG_INFO(log, "Objects with paths [{}] were removed from S3", removed_keys_comma_separated);
|
||||
}
|
||||
|
||||
if (!not_found_keys.empty())
|
||||
{
|
||||
/// In this case even if absence of key may be ok for us, the log will be polluted with error messages from aws sdk.
|
||||
/// Looks like there is no way to suppress them.
|
||||
LOG_TRACE(log, "Object with paths [{}] were skipped because they didn't exist", not_found_keys);
|
||||
}
|
||||
|
||||
if (other_error)
|
||||
std::rethrow_exception(other_error);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// DeleteObjects didn't succeed, that means either a) this operation isn't supported at all;
|
||||
/// or b) all the objects didn't exist; or c) some failure occurred.
|
||||
const auto & err = outcome.GetError();
|
||||
if ((err.GetExceptionName() == "InvalidRequest") || (err.GetExceptionName() == "InvalidArgument")
|
||||
|| (err.GetExceptionName() == "NotImplemented"))
|
||||
{
|
||||
LOG_TRACE(log, "DeleteObjects is not supported: {} (Code: {}). Retrying with plain DeleteObject.",
|
||||
err.GetMessage(), static_cast<size_t>(err.GetErrorType()));
|
||||
/// Multiple threads can call deleteFilesFromS3() with a reference to the same `s3_capabilities`,
|
||||
/// and the following line doesn't cause a race because `s3_capabilities` is protected with mutex.
|
||||
s3_capabilities.setIsBatchDeleteSupported(false);
|
||||
need_retry_with_plain_delete_object = true;
|
||||
break;
|
||||
}
|
||||
|
||||
if (if_exists && S3::isNotFoundError(err.GetErrorType()))
|
||||
{
|
||||
LOG_TRACE(log, "Object with paths [{}] were skipped because they didn't exist", comma_separated_keys);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw S3Exception(err.GetErrorType(), "{} (Code: {}) while removing objects with paths [{}] from S3",
|
||||
err.GetMessage(), static_cast<size_t>(err.GetErrorType()), comma_separated_keys);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!need_retry_with_plain_delete_object)
|
||||
return;
|
||||
}
|
||||
|
||||
/// Batch delete (DeleteObjects) isn't supported so we'll delete all the files sequentially.
|
||||
for (size_t i = 0; i != keys.size(); ++i)
|
||||
{
|
||||
const String & local_path_for_blob_storage_log = (i < local_paths_for_blob_storage_log.size()) ? local_paths_for_blob_storage_log[i] : empty_string;
|
||||
size_t file_size_for_blob_storage_log = (i < file_sizes_for_blob_storage_log.size()) ? file_sizes_for_blob_storage_log[i] : 0;
|
||||
|
||||
deleteFileFromS3(s3_client, bucket, keys[i], if_exists,
|
||||
blob_storage_log, local_path_for_blob_storage_log, file_size_for_blob_storage_log,
|
||||
profile_event);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
#endif
|
51
src/IO/S3/deleteFileFromS3.h
Normal file
51
src/IO/S3/deleteFileFromS3.h
Normal file
@ -0,0 +1,51 @@
|
||||
#pragma once
|
||||
|
||||
#include "config.h"
|
||||
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Core/Types.h>
|
||||
#include <memory>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace S3
|
||||
{
|
||||
class Client;
|
||||
}
|
||||
|
||||
class S3Capabilities;
|
||||
class BlobStorageLogWriter;
|
||||
using BlobStorageLogWriterPtr = std::shared_ptr<BlobStorageLogWriter>;
|
||||
|
||||
|
||||
/// Deletes one file from S3.
|
||||
void deleteFileFromS3(
|
||||
const std::shared_ptr<const S3::Client> & s3_client,
|
||||
const String & bucket,
|
||||
const String & key,
|
||||
bool if_exists = false,
|
||||
BlobStorageLogWriterPtr blob_storage_log = nullptr,
|
||||
const String & local_path_for_blob_storage_log = {},
|
||||
size_t file_size_for_blob_storage_log = 0,
|
||||
std::optional<ProfileEvents::Event> profile_event = std::nullopt);
|
||||
|
||||
/// Deletes multiple files from S3 using batch requests when it's possible.
|
||||
void deleteFilesFromS3(
|
||||
const std::shared_ptr<const S3::Client> & s3_client,
|
||||
const String & bucket,
|
||||
const Strings & keys,
|
||||
bool if_exists,
|
||||
S3Capabilities & s3_capabilities,
|
||||
size_t batch_size = 1000,
|
||||
BlobStorageLogWriterPtr blob_storage_log = nullptr,
|
||||
const Strings & local_paths_for_blob_storage_log = {},
|
||||
const std::vector<size_t> & file_sizes_for_blob_storage_log = {},
|
||||
std::optional<ProfileEvents::Event> profile_event = std::nullopt);
|
||||
|
||||
}
|
||||
|
||||
#endif
|
@ -66,7 +66,8 @@ namespace
|
||||
|
||||
bool isNotFoundError(Aws::S3::S3Errors error)
|
||||
{
|
||||
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY;
|
||||
return error == Aws::S3::S3Errors::RESOURCE_NOT_FOUND || error == Aws::S3::S3Errors::NO_SUCH_KEY
|
||||
|| error == Aws::S3::S3Errors::NO_SUCH_BUCKET;
|
||||
}
|
||||
|
||||
ObjectInfo getObjectInfo(
|
||||
|
@ -144,11 +144,7 @@ ObjectStoragePtr StorageS3Configuration::createObjectStorage(ContextPtr context,
|
||||
|
||||
auto client = getClient(url, *s3_settings, context, /* for_disk_s3 */false);
|
||||
auto key_generator = createObjectStorageKeysGeneratorAsIsWithPrefix(url.key);
|
||||
auto s3_capabilities = S3Capabilities
|
||||
{
|
||||
.support_batch_delete = config.getBool("s3.support_batch_delete", true),
|
||||
.support_proxy = config.getBool("s3.support_proxy", config.has("s3.proxy")),
|
||||
};
|
||||
auto s3_capabilities = getCapabilitiesFromConfig(config, "s3");
|
||||
|
||||
return std::make_shared<S3ObjectStorage>(
|
||||
std::move(client), std::move(s3_settings), url, s3_capabilities,
|
||||
|
@ -32,6 +32,7 @@ def request(command, url, headers={}, data=None):
|
||||
|
||||
|
||||
class RequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
# GetObject
|
||||
def do_GET(self):
|
||||
if self.path == "/":
|
||||
self.send_response(200)
|
||||
@ -41,12 +42,18 @@ class RequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
else:
|
||||
self.do_HEAD()
|
||||
|
||||
# PutObject
|
||||
def do_PUT(self):
|
||||
self.do_HEAD()
|
||||
|
||||
# DeleteObjects (/root?delete)
|
||||
def do_POST(self):
|
||||
self.do_HEAD()
|
||||
|
||||
# DeleteObject
|
||||
def do_DELETE(self):
|
||||
self.do_HEAD()
|
||||
|
||||
def do_HEAD(self):
|
||||
content_length = self.headers.get("Content-Length")
|
||||
data = self.rfile.read(int(content_length)) if content_length else None
|
||||
|
Loading…
Reference in New Issue
Block a user