Merge pull request #37659 from frew/master

Support `batch_delete` capability for GCS
This commit is contained in:
alesapin 2022-06-13 13:39:01 +02:00 committed by GitHub
commit 06d94a4dde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 104 additions and 32 deletions

View File

@ -0,0 +1,15 @@
#include <Disks/ObjectStorages/S3/S3Capabilities.h>
namespace DB
{
S3Capabilities getCapabilitiesFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix)
{
return S3Capabilities
{
.support_batch_delete = config.getBool(config_prefix + ".support_batch_delete", true),
.support_proxy = config.getBool(config_prefix + ".support_proxy", config.has(config_prefix + ".proxy")),
};
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <string>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{
/// Supported/unsupported features by different S3 implementations
/// Can be useful only for almost compatible with AWS S3 versions.
struct S3Capabilities
{
/// Google S3 implementation doesn't support batch delete
/// TODO: possibly we have to use Google SDK https://github.com/googleapis/google-cloud-cpp/tree/main/google/cloud/storage
/// because looks like it miss a lot of features like:
/// 1) batch delete
/// 2) list_v2
/// 3) multipart upload works differently
bool support_batch_delete{true};
/// Y.Cloud S3 implementation support proxy for connection
bool support_proxy{false};
};
S3Capabilities getCapabilitiesFromConfig(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix);
}

View File

@ -17,6 +17,7 @@
#include <aws/s3/model/CopyObjectRequest.h> #include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h> #include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/HeadObjectRequest.h> #include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h> #include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h> #include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h> #include <aws/s3/model/CompleteMultipartUploadRequest.h>
@ -213,18 +214,34 @@ void S3ObjectStorage::listPrefix(const std::string & path, BlobsPathToSize & chi
void S3ObjectStorage::removeObject(const std::string & path) void S3ObjectStorage::removeObject(const std::string & path)
{ {
auto client_ptr = client.get(); auto client_ptr = client.get();
Aws::S3::Model::ObjectIdentifier obj; auto settings_ptr = s3_settings.get();
obj.SetKey(path);
Aws::S3::Model::Delete delkeys; // If chunk size is 0, only use single delete request
delkeys.SetObjects({obj}); // This allows us to work with GCS, which doesn't support DeleteObjects
if (!s3_capabilities.support_batch_delete)
{
Aws::S3::Model::DeleteObjectRequest request;
request.SetBucket(bucket);
request.SetKey(path);
auto outcome = client_ptr->DeleteObject(request);
Aws::S3::Model::DeleteObjectsRequest request; throwIfError(outcome);
request.SetBucket(bucket); }
request.SetDelete(delkeys); else
auto outcome = client_ptr->DeleteObjects(request); {
/// TODO: For AWS we prefer to use multiobject operation even for single object
/// maybe we shouldn't?
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(path);
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects({obj});
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = client_ptr->DeleteObjects(request);
throwIfError(outcome); throwIfError(outcome);
}
} }
void S3ObjectStorage::removeObjects(const std::vector<std::string> & paths) void S3ObjectStorage::removeObjects(const std::vector<std::string> & paths)
@ -235,31 +252,39 @@ void S3ObjectStorage::removeObjects(const std::vector<std::string> & paths)
auto client_ptr = client.get(); auto client_ptr = client.get();
auto settings_ptr = s3_settings.get(); auto settings_ptr = s3_settings.get();
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete; if (!s3_capabilities.support_batch_delete)
size_t current_position = 0;
while (current_position < paths.size())
{ {
std::vector<Aws::S3::Model::ObjectIdentifier> current_chunk; for (const auto & path : paths)
String keys; removeObject(path);
for (; current_position < paths.size() && current_chunk.size() < chunk_size_limit; ++current_position) }
else
{
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
size_t current_position = 0;
while (current_position < paths.size())
{ {
Aws::S3::Model::ObjectIdentifier obj; std::vector<Aws::S3::Model::ObjectIdentifier> current_chunk;
obj.SetKey(paths[current_position]); String keys;
current_chunk.push_back(obj); for (; current_position < paths.size() && current_chunk.size() < chunk_size_limit; ++current_position)
{
Aws::S3::Model::ObjectIdentifier obj;
obj.SetKey(paths[current_position]);
current_chunk.push_back(obj);
if (!keys.empty()) if (!keys.empty())
keys += ", "; keys += ", ";
keys += paths[current_position]; keys += paths[current_position];
}
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects(current_chunk);
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = client_ptr->DeleteObjects(request);
throwIfError(outcome);
} }
Aws::S3::Model::Delete delkeys;
delkeys.SetObjects(current_chunk);
Aws::S3::Model::DeleteObjectsRequest request;
request.SetBucket(bucket);
request.SetDelete(delkeys);
auto outcome = client_ptr->DeleteObjects(request);
throwIfError(outcome);
} }
} }
@ -493,7 +518,7 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(const std::s
return std::make_unique<S3ObjectStorage>( return std::make_unique<S3ObjectStorage>(
nullptr, getClient(config, config_prefix, context), nullptr, getClient(config, config_prefix, context),
getSettings(config, config_prefix, context), getSettings(config, config_prefix, context),
version_id, new_namespace); version_id, s3_capabilities, new_namespace);
} }
} }

View File

@ -5,6 +5,7 @@
#if USE_AWS_S3 #if USE_AWS_S3
#include <Disks/ObjectStorages/IObjectStorage.h> #include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/ObjectStorages/S3/S3Capabilities.h>
#include <memory> #include <memory>
#include <aws/s3/S3Client.h> #include <aws/s3/S3Client.h>
#include <aws/s3/model/HeadObjectResult.h> #include <aws/s3/model/HeadObjectResult.h>
@ -46,11 +47,13 @@ public:
std::unique_ptr<Aws::S3::S3Client> && client_, std::unique_ptr<Aws::S3::S3Client> && client_,
std::unique_ptr<S3ObjectStorageSettings> && s3_settings_, std::unique_ptr<S3ObjectStorageSettings> && s3_settings_,
String version_id_, String version_id_,
const S3Capabilities & s3_capabilities_,
String bucket_) String bucket_)
: IObjectStorage(std::move(cache_)) : IObjectStorage(std::move(cache_))
, bucket(bucket_) , bucket(bucket_)
, client(std::move(client_)) , client(std::move(client_))
, s3_settings(std::move(s3_settings_)) , s3_settings(std::move(s3_settings_))
, s3_capabilities(s3_capabilities_)
, version_id(std::move(version_id_)) , version_id(std::move(version_id_))
{} {}
@ -129,6 +132,7 @@ private:
MultiVersion<Aws::S3::S3Client> client; MultiVersion<Aws::S3::S3Client> client;
MultiVersion<S3ObjectStorageSettings> s3_settings; MultiVersion<S3ObjectStorageSettings> s3_settings;
const S3Capabilities s3_capabilities;
const String version_id; const String version_id;
}; };

View File

@ -89,11 +89,12 @@ void registerDiskS3(DiskFactory & factory)
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri.key); auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, uri.key);
FileCachePtr cache = getCachePtrForDisk(name, config, config_prefix, context); FileCachePtr cache = getCachePtrForDisk(name, config, config_prefix, context);
S3Capabilities s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
ObjectStoragePtr s3_storage = std::make_unique<S3ObjectStorage>( ObjectStoragePtr s3_storage = std::make_unique<S3ObjectStorage>(
std::move(cache), getClient(config, config_prefix, context), std::move(cache), getClient(config, config_prefix, context),
getSettings(config, config_prefix, context), getSettings(config, config_prefix, context),
uri.version_id, uri.bucket); uri.version_id, s3_capabilities, uri.bucket);
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false); bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16); uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);