mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-25 17:12:03 +00:00
Use new copy s3 functions in S3ObjectStorage.
This commit is contained in:
parent
f0fda580d0
commit
5ceb64accc
@ -8,7 +8,7 @@
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/HTTPHeaderEntries.h>
|
||||
#include <IO/S3/copyDataToS3.h>
|
||||
#include <IO/S3/copyS3File.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
|
||||
#include <aws/core/auth/AWSCredentials.h>
|
||||
@ -167,16 +167,16 @@ void BackupWriterS3::copyFileNative(DiskPtr src_disk, const String & src_file_na
|
||||
auto object_storage = src_disk->getObjectStorage();
|
||||
std::string src_bucket = object_storage->getObjectsNamespace();
|
||||
auto file_path = fs::path(s3_uri.key) / dest_file_name;
|
||||
copyFileS3ToS3(client, src_bucket, objects[0].absolute_path, src_offset, src_size, s3_uri.bucket, file_path, request_settings, {},
|
||||
threadPoolCallbackRunner<void>(IOThreadPool::get(), "BackupWriterS3"));
|
||||
copyS3File(client, src_bucket, objects[0].absolute_path, src_offset, src_size, s3_uri.bucket, file_path, request_settings, {},
|
||||
threadPoolCallbackRunner<void>(IOThreadPool::get(), "BackupWriterS3"));
|
||||
}
|
||||
}
|
||||
|
||||
void BackupWriterS3::copyDataToFile(
|
||||
const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name)
|
||||
{
|
||||
copyDataToS3(create_read_buffer, offset, size, client, s3_uri.bucket, fs::path(s3_uri.key) / dest_file_name, request_settings, {},
|
||||
threadPoolCallbackRunner<void>(IOThreadPool::get(), "BackupWriterS3"));
|
||||
copyDataToS3File(create_read_buffer, offset, size, client, s3_uri.bucket, fs::path(s3_uri.key) / dest_file_name, request_settings, {},
|
||||
threadPoolCallbackRunner<void>(IOThreadPool::get(), "BackupWriterS3"));
|
||||
}
|
||||
|
||||
BackupWriterS3::~BackupWriterS3() = default;
|
||||
|
@ -16,18 +16,13 @@
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/SeekAvoidingReadBuffer.h>
|
||||
#include <IO/S3/copyS3File.h>
|
||||
#include <Interpreters/threadPoolCallbackRunner.h>
|
||||
#include <Disks/ObjectStorages/S3/diskSettings.h>
|
||||
|
||||
#include <aws/s3/model/CopyObjectRequest.h>
|
||||
#include <aws/s3/model/ListObjectsV2Request.h>
|
||||
#include <aws/s3/model/HeadObjectRequest.h>
|
||||
#include <aws/s3/model/DeleteObjectRequest.h>
|
||||
#include <aws/s3/model/DeleteObjectsRequest.h>
|
||||
#include <aws/s3/model/CreateMultipartUploadRequest.h>
|
||||
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
|
||||
#include <aws/s3/model/UploadPartCopyRequest.h>
|
||||
#include <aws/s3/model/AbortMultipartUploadRequest.h>
|
||||
|
||||
#include <Common/getRandomASCIIString.h>
|
||||
#include <Common/StringUtils/StringUtils.h>
|
||||
@ -39,22 +34,9 @@
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event S3DeleteObjects;
|
||||
extern const Event S3HeadObject;
|
||||
extern const Event S3ListObjects;
|
||||
extern const Event S3CopyObject;
|
||||
extern const Event S3CreateMultipartUpload;
|
||||
extern const Event S3UploadPartCopy;
|
||||
extern const Event S3AbortMultipartUpload;
|
||||
extern const Event S3CompleteMultipartUpload;
|
||||
|
||||
extern const Event DiskS3DeleteObjects;
|
||||
extern const Event DiskS3HeadObject;
|
||||
extern const Event DiskS3ListObjects;
|
||||
extern const Event DiskS3CopyObject;
|
||||
extern const Event DiskS3CreateMultipartUpload;
|
||||
extern const Event DiskS3UploadPartCopy;
|
||||
extern const Event DiskS3AbortMultipartUpload;
|
||||
extern const Event DiskS3CompleteMultipartUpload;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -125,21 +107,11 @@ std::string S3ObjectStorage::generateBlobNameForPath(const std::string & /* path
|
||||
getRandomASCIIString(key_name_total_size - key_name_prefix_size));
|
||||
}
|
||||
|
||||
size_t S3ObjectStorage::getObjectSize(const std::string & bucket_from, const std::string & key) const
|
||||
{
|
||||
return S3::getObjectSize(*client.get(), bucket_from, key, {}, /* for_disk_s3= */ true);
|
||||
}
|
||||
|
||||
bool S3ObjectStorage::exists(const StoredObject & object) const
|
||||
{
|
||||
return S3::objectExists(*client.get(), bucket, object.absolute_path, {}, /* for_disk_s3= */ true);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::checkObjectExists(const std::string & bucket_from, const std::string & key, std::string_view description) const
|
||||
{
|
||||
return S3::checkObjectExists(*client.get(), bucket_from, key, {}, /* for_disk_s3= */ true, description);
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
|
||||
const StoredObjects & objects,
|
||||
const ReadSettings & read_settings,
|
||||
@ -431,7 +403,12 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
|
||||
/// Shortcut for S3
|
||||
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
|
||||
{
|
||||
copyObjectImpl(bucket, object_from.absolute_path, dest_s3->bucket, object_to.absolute_path, {}, object_to_attributes);
|
||||
auto client_ptr = client.get();
|
||||
auto size = S3::getObjectSize(*client_ptr, bucket, object_from.absolute_path, {}, /* for_disk_s3= */ true);
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
|
||||
copyS3File(client_ptr, bucket, object_from.absolute_path, 0, size, dest_s3->bucket, object_to.absolute_path,
|
||||
settings_ptr->request_settings, object_to_attributes, scheduler, /* for_disk_s3= */ true);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -439,148 +416,15 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
|
||||
}
|
||||
}
|
||||
|
||||
void S3ObjectStorage::copyObjectImpl(
|
||||
const String & src_bucket,
|
||||
const String & src_key,
|
||||
const String & dst_bucket,
|
||||
const String & dst_key,
|
||||
size_t size,
|
||||
std::optional<ObjectAttributes> metadata) const
|
||||
{
|
||||
auto client_ptr = client.get();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3CopyObject);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CopyObject);
|
||||
Aws::S3::Model::CopyObjectRequest request;
|
||||
request.SetCopySource(src_bucket + "/" + src_key);
|
||||
request.SetBucket(dst_bucket);
|
||||
request.SetKey(dst_key);
|
||||
if (metadata)
|
||||
{
|
||||
request.SetMetadata(*metadata);
|
||||
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
|
||||
}
|
||||
|
||||
auto outcome = client_ptr->CopyObject(request);
|
||||
|
||||
if (!outcome.IsSuccess() && (outcome.GetError().GetExceptionName() == "EntityTooLarge"
|
||||
|| outcome.GetError().GetExceptionName() == "InvalidRequest"))
|
||||
{ // Can't come here with MinIO, MinIO allows single part upload for large objects.
|
||||
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, size, metadata);
|
||||
return;
|
||||
}
|
||||
|
||||
throwIfError(outcome);
|
||||
|
||||
auto settings_ptr = s3_settings.get();
|
||||
if (settings_ptr->request_settings.check_objects_after_upload)
|
||||
checkObjectExists(dst_bucket, dst_key, "Immediately after upload");
|
||||
}
|
||||
|
||||
void S3ObjectStorage::copyObjectMultipartImpl(
|
||||
const String & src_bucket,
|
||||
const String & src_key,
|
||||
const String & dst_bucket,
|
||||
const String & dst_key,
|
||||
size_t size,
|
||||
std::optional<ObjectAttributes> metadata) const
|
||||
{
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto client_ptr = client.get();
|
||||
|
||||
String multipart_upload_id;
|
||||
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3CreateMultipartUpload);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CreateMultipartUpload);
|
||||
Aws::S3::Model::CreateMultipartUploadRequest request;
|
||||
request.SetBucket(dst_bucket);
|
||||
request.SetKey(dst_key);
|
||||
if (metadata)
|
||||
request.SetMetadata(*metadata);
|
||||
|
||||
auto outcome = client_ptr->CreateMultipartUpload(request);
|
||||
|
||||
throwIfError(outcome);
|
||||
|
||||
multipart_upload_id = outcome.GetResult().GetUploadId();
|
||||
}
|
||||
|
||||
std::vector<String> part_tags;
|
||||
|
||||
size_t upload_part_size = settings_ptr->request_settings.getUploadSettings().min_upload_part_size;
|
||||
for (size_t position = 0, part_number = 1; position < size; ++part_number, position += upload_part_size)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3UploadPartCopy);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3UploadPartCopy);
|
||||
Aws::S3::Model::UploadPartCopyRequest part_request;
|
||||
part_request.SetCopySource(src_bucket + "/" + src_key);
|
||||
part_request.SetBucket(dst_bucket);
|
||||
part_request.SetKey(dst_key);
|
||||
part_request.SetUploadId(multipart_upload_id);
|
||||
part_request.SetPartNumber(static_cast<int>(part_number));
|
||||
part_request.SetCopySourceRange(fmt::format("bytes={}-{}", position, std::min(size, position + upload_part_size) - 1));
|
||||
|
||||
auto outcome = client_ptr->UploadPartCopy(part_request);
|
||||
if (!outcome.IsSuccess())
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3AbortMultipartUpload);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3AbortMultipartUpload);
|
||||
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
|
||||
abort_request.SetBucket(dst_bucket);
|
||||
abort_request.SetKey(dst_key);
|
||||
abort_request.SetUploadId(multipart_upload_id);
|
||||
client_ptr->AbortMultipartUpload(abort_request);
|
||||
// In error case we throw exception later with first error from UploadPartCopy
|
||||
}
|
||||
throwIfError(outcome);
|
||||
|
||||
auto etag = outcome.GetResult().GetCopyPartResult().GetETag();
|
||||
part_tags.push_back(etag);
|
||||
}
|
||||
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload);
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload);
|
||||
Aws::S3::Model::CompleteMultipartUploadRequest req;
|
||||
req.SetBucket(dst_bucket);
|
||||
req.SetKey(dst_key);
|
||||
req.SetUploadId(multipart_upload_id);
|
||||
|
||||
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
|
||||
for (size_t i = 0; i < part_tags.size(); ++i)
|
||||
{
|
||||
Aws::S3::Model::CompletedPart part;
|
||||
multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(static_cast<int>(i) + 1));
|
||||
}
|
||||
|
||||
req.SetMultipartUpload(multipart_upload);
|
||||
|
||||
auto outcome = client_ptr->CompleteMultipartUpload(req);
|
||||
|
||||
throwIfError(outcome);
|
||||
}
|
||||
|
||||
if (settings_ptr->request_settings.check_objects_after_upload)
|
||||
checkObjectExists(dst_bucket, dst_key, "Immediately after upload");
|
||||
}
|
||||
|
||||
void S3ObjectStorage::copyObject( // NOLINT
|
||||
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> object_to_attributes)
|
||||
{
|
||||
auto size = getObjectSize(bucket, object_from.absolute_path);
|
||||
static constexpr int64_t multipart_upload_threashold = 5UL * 1024 * 1024 * 1024;
|
||||
|
||||
if (size >= multipart_upload_threashold)
|
||||
{
|
||||
copyObjectMultipartImpl(
|
||||
bucket, object_from.absolute_path, bucket, object_to.absolute_path, size, object_to_attributes);
|
||||
}
|
||||
else
|
||||
{
|
||||
copyObjectImpl(
|
||||
bucket, object_from.absolute_path, bucket, object_to.absolute_path, size, object_to_attributes);
|
||||
}
|
||||
auto client_ptr = client.get();
|
||||
auto size = S3::getObjectSize(*client_ptr, bucket, object_from.absolute_path, {}, /* for_disk_s3= */ true);
|
||||
auto settings_ptr = s3_settings.get();
|
||||
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
|
||||
copyS3File(client_ptr, bucket, object_from.absolute_path, 0, size, bucket, object_to.absolute_path,
|
||||
settings_ptr->request_settings, object_to_attributes, scheduler, /* for_disk_s3= */ true);
|
||||
}
|
||||
|
||||
void S3ObjectStorage::setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_)
|
||||
|
@ -8,8 +8,6 @@
|
||||
#include <Disks/ObjectStorages/S3/S3Capabilities.h>
|
||||
#include <memory>
|
||||
#include <aws/s3/S3Client.h>
|
||||
#include <aws/s3/model/HeadObjectResult.h>
|
||||
#include <aws/s3/model/ListObjectsV2Result.h>
|
||||
#include <Storages/StorageS3Settings.h>
|
||||
#include <Common/MultiVersion.h>
|
||||
#include <Common/logger_useful.h>
|
||||
@ -167,28 +165,9 @@ private:
|
||||
|
||||
void setNewClient(std::unique_ptr<Aws::S3::S3Client> && client_);
|
||||
|
||||
void copyObjectImpl(
|
||||
const String & src_bucket,
|
||||
const String & src_key,
|
||||
const String & dst_bucket,
|
||||
const String & dst_key,
|
||||
size_t size,
|
||||
std::optional<ObjectAttributes> metadata = std::nullopt) const;
|
||||
|
||||
void copyObjectMultipartImpl(
|
||||
const String & src_bucket,
|
||||
const String & src_key,
|
||||
const String & dst_bucket,
|
||||
const String & dst_key,
|
||||
size_t size,
|
||||
std::optional<ObjectAttributes> metadata = std::nullopt) const;
|
||||
|
||||
void removeObjectImpl(const StoredObject & object, bool if_exists);
|
||||
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
|
||||
|
||||
size_t getObjectSize(const std::string & bucket_from, const std::string & key) const;
|
||||
void checkObjectExists(const std::string & bucket_from, const std::string & key, std::string_view description) const;
|
||||
|
||||
std::string bucket;
|
||||
|
||||
MultiVersion<Aws::S3::S3Client> client;
|
||||
|
@ -1,4 +1,4 @@
|
||||
#include <IO/S3/copyDataToS3.h>
|
||||
#include <IO/S3/copyS3File.h>
|
||||
|
||||
#if USE_AWS_S3
|
||||
|
||||
@ -22,8 +22,15 @@ namespace ProfileEvents
|
||||
{
|
||||
extern const Event S3CreateMultipartUpload;
|
||||
extern const Event S3CompleteMultipartUpload;
|
||||
extern const Event S3UploadPart;
|
||||
extern const Event S3PutObject;
|
||||
extern const Event S3UploadPart;
|
||||
extern const Event S3UploadPartCopy;
|
||||
|
||||
extern const Event DiskS3CreateMultipartUpload;
|
||||
extern const Event DiskS3CompleteMultipartUpload;
|
||||
extern const Event DiskS3PutObject;
|
||||
extern const Event DiskS3UploadPart;
|
||||
extern const Event DiskS3UploadPartCopy;
|
||||
}
|
||||
|
||||
|
||||
@ -50,6 +57,7 @@ namespace
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
const std::optional<std::map<String, String>> & object_metadata_,
|
||||
ThreadPoolCallbackRunner<void> schedule_,
|
||||
bool for_disk_s3_,
|
||||
const Poco::Logger * log_)
|
||||
: client_ptr(client_ptr_)
|
||||
, dest_bucket(dest_bucket_)
|
||||
@ -59,6 +67,7 @@ namespace
|
||||
, max_unexpected_write_error_retries(request_settings_.max_unexpected_write_error_retries)
|
||||
, object_metadata(object_metadata_)
|
||||
, schedule(schedule_)
|
||||
, for_disk_s3(for_disk_s3_)
|
||||
, log(log_)
|
||||
{
|
||||
}
|
||||
@ -74,6 +83,7 @@ namespace
|
||||
size_t max_unexpected_write_error_retries;
|
||||
const std::optional<std::map<String, String>> & object_metadata;
|
||||
ThreadPoolCallbackRunner<void> schedule;
|
||||
bool for_disk_s3;
|
||||
const Poco::Logger * log;
|
||||
|
||||
struct UploadPartTask
|
||||
@ -111,6 +121,8 @@ namespace
|
||||
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(settings.storage_class_name));
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3CreateMultipartUpload);
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CreateMultipartUpload);
|
||||
|
||||
auto outcome = client_ptr->CreateMultipartUpload(request);
|
||||
|
||||
@ -151,6 +163,8 @@ namespace
|
||||
for (size_t i = 0; i < max_retry; ++i)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload);
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload);
|
||||
|
||||
auto outcome = client_ptr->CompleteMultipartUpload(request);
|
||||
|
||||
@ -379,11 +393,11 @@ namespace
|
||||
}
|
||||
};
|
||||
|
||||
/// Helper class to help implementing copyDataToS3().
|
||||
class CopyDataToS3Helper : public UploadHelper
|
||||
/// Helper class to help implementing copyDataToS3File().
|
||||
class CopyDataToFileHelper : public UploadHelper
|
||||
{
|
||||
public:
|
||||
CopyDataToS3Helper(
|
||||
CopyDataToFileHelper(
|
||||
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer_,
|
||||
size_t offset_,
|
||||
size_t size_,
|
||||
@ -392,8 +406,9 @@ namespace
|
||||
const String & dest_key_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
const std::optional<std::map<String, String>> & object_metadata_,
|
||||
ThreadPoolCallbackRunner<void> schedule_)
|
||||
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, &Poco::Logger::get("copyDataToS3"))
|
||||
ThreadPoolCallbackRunner<void> schedule_,
|
||||
bool for_disk_s3_)
|
||||
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyDataToS3File"))
|
||||
, create_read_buffer(create_read_buffer_)
|
||||
, offset(offset_)
|
||||
, size(size_)
|
||||
@ -448,6 +463,9 @@ namespace
|
||||
for (size_t i = 0; i < max_retry; ++i)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::S3PutObject);
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3PutObject);
|
||||
|
||||
auto outcome = client_ptr->PutObject(request);
|
||||
|
||||
if (outcome.IsSuccess())
|
||||
@ -523,6 +541,8 @@ namespace
|
||||
auto & req = typeid_cast<Aws::S3::Model::UploadPartRequest &>(request);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3UploadPart);
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3UploadPart);
|
||||
|
||||
auto outcome = client_ptr->UploadPart(req);
|
||||
if (!outcome.IsSuccess())
|
||||
@ -535,11 +555,11 @@ namespace
|
||||
}
|
||||
};
|
||||
|
||||
/// Helper class to help implementing copyFileS3ToS3().
|
||||
class CopyFileS3ToS3Helper : public UploadHelper
|
||||
/// Helper class to help implementing copyS3File().
|
||||
class CopyFileHelper : public UploadHelper
|
||||
{
|
||||
public:
|
||||
CopyFileS3ToS3Helper(
|
||||
CopyFileHelper(
|
||||
const std::shared_ptr<const Aws::S3::S3Client> & client_ptr_,
|
||||
const String & src_bucket_,
|
||||
const String & src_key_,
|
||||
@ -549,8 +569,9 @@ namespace
|
||||
const String & dest_key_,
|
||||
const S3Settings::RequestSettings & request_settings_,
|
||||
const std::optional<std::map<String, String>> & object_metadata_,
|
||||
ThreadPoolCallbackRunner<void> schedule_)
|
||||
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, &Poco::Logger::get("copyFileS3ToS3"))
|
||||
ThreadPoolCallbackRunner<void> schedule_,
|
||||
bool for_disk_s3_)
|
||||
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyS3File"))
|
||||
, src_bucket(src_bucket_)
|
||||
, src_key(src_key_)
|
||||
, offset(src_offset_)
|
||||
@ -676,6 +697,10 @@ namespace
|
||||
{
|
||||
auto & req = typeid_cast<Aws::S3::Model::UploadPartCopyRequest &>(request);
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::S3UploadPartCopy);
|
||||
if (for_disk_s3)
|
||||
ProfileEvents::increment(ProfileEvents::DiskS3UploadPartCopy);
|
||||
|
||||
auto outcome = client_ptr->UploadPartCopy(req);
|
||||
if (!outcome.IsSuccess())
|
||||
{
|
||||
@ -689,7 +714,7 @@ namespace
|
||||
}
|
||||
|
||||
|
||||
void copyDataToS3(
|
||||
void copyDataToS3File(
|
||||
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
@ -698,14 +723,15 @@ void copyDataToS3(
|
||||
const String & dest_key,
|
||||
const S3Settings::RequestSettings & settings,
|
||||
const std::optional<std::map<String, String>> & object_metadata,
|
||||
ThreadPoolCallbackRunner<void> schedule)
|
||||
ThreadPoolCallbackRunner<void> schedule,
|
||||
bool for_disk_s3)
|
||||
{
|
||||
CopyDataToS3Helper helper{create_read_buffer, offset, size, dest_s3_client, dest_bucket, dest_key, settings, object_metadata, schedule};
|
||||
CopyDataToFileHelper helper{create_read_buffer, offset, size, dest_s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3};
|
||||
helper.performCopy();
|
||||
}
|
||||
|
||||
|
||||
void copyFileS3ToS3(
|
||||
void copyS3File(
|
||||
const std::shared_ptr<const Aws::S3::S3Client> & s3_client,
|
||||
const String & src_bucket,
|
||||
const String & src_key,
|
||||
@ -715,9 +741,10 @@ void copyFileS3ToS3(
|
||||
const String & dest_key,
|
||||
const S3Settings::RequestSettings & settings,
|
||||
const std::optional<std::map<String, String>> & object_metadata,
|
||||
ThreadPoolCallbackRunner<void> schedule)
|
||||
ThreadPoolCallbackRunner<void> schedule,
|
||||
bool for_disk_s3)
|
||||
{
|
||||
CopyFileS3ToS3Helper helper{s3_client, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, object_metadata, schedule};
|
||||
CopyFileHelper helper{s3_client, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3};
|
||||
helper.performCopy();
|
||||
}
|
||||
|
@ -16,27 +16,11 @@ namespace DB
|
||||
{
|
||||
class SeekableReadBuffer;
|
||||
|
||||
/// Copies data from any seekable source to S3.
|
||||
/// The same functionality can be done by using the function copyData() and the class WriteBufferFromS3
|
||||
/// however copyDataToS3() is faster and spends less memory.
|
||||
/// The callback `create_read_buffer` can be called from multiple threads in parallel, so that should be thread-safe.
|
||||
/// The parameters `offset` and `size` specify a part in the source to copy.
|
||||
void copyDataToS3(
|
||||
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
const std::shared_ptr<const Aws::S3::S3Client> & dest_s3_client,
|
||||
const String & dest_bucket,
|
||||
const String & dest_key,
|
||||
const S3Settings::RequestSettings & settings,
|
||||
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
|
||||
ThreadPoolCallbackRunner<void> schedule_ = {});
|
||||
|
||||
/// Copies a file from S3 to S3.
|
||||
/// The same functionality can be done by using the function copyData() and the classes ReadBufferFromS3 and WriteBufferFromS3
|
||||
/// however copyFileS3ToS3() is faster and spends less network traffic and memory.
|
||||
/// however copyS3File() is faster and spends less network traffic and memory.
|
||||
/// The parameters `src_offset` and `src_size` specify a part in the source to copy.
|
||||
void copyFileS3ToS3(
|
||||
void copyS3File(
|
||||
const std::shared_ptr<const Aws::S3::S3Client> & s3_client,
|
||||
const String & src_bucket,
|
||||
const String & src_key,
|
||||
@ -46,7 +30,25 @@ void copyFileS3ToS3(
|
||||
const String & dest_key,
|
||||
const S3Settings::RequestSettings & settings,
|
||||
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
|
||||
ThreadPoolCallbackRunner<void> schedule_ = {});
|
||||
ThreadPoolCallbackRunner<void> schedule_ = {},
|
||||
bool for_disk_s3 = false);
|
||||
|
||||
/// Copies data from any seekable source to S3.
|
||||
/// The same functionality can be done by using the function copyData() and the class WriteBufferFromS3
|
||||
/// however copyDataToS3File() is faster and spends less memory.
|
||||
/// The callback `create_read_buffer` can be called from multiple threads in parallel, so that should be thread-safe.
|
||||
/// The parameters `offset` and `size` specify a part in the source to copy.
|
||||
void copyDataToS3File(
|
||||
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
|
||||
size_t offset,
|
||||
size_t size,
|
||||
const std::shared_ptr<const Aws::S3::S3Client> & dest_s3_client,
|
||||
const String & dest_bucket,
|
||||
const String & dest_key,
|
||||
const S3Settings::RequestSettings & settings,
|
||||
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
|
||||
ThreadPoolCallbackRunner<void> schedule_ = {},
|
||||
bool for_disk_s3 = false);
|
||||
|
||||
}
|
||||
|
@ -127,7 +127,7 @@ def test_backup_to_s3_multipart():
|
||||
backup_destination = f"S3('http://minio1:9001/root/data/backups/multipart/{backup_name}', 'minio', 'minio123')"
|
||||
check_backup_and_restore(storage_policy, backup_destination, size=1000000)
|
||||
assert node.contains_in_log(
|
||||
f"copyDataToS3: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}"
|
||||
f"copyDataToS3File: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}"
|
||||
)
|
||||
|
||||
|
||||
@ -140,7 +140,7 @@ def test_backup_to_s3_native_copy():
|
||||
check_backup_and_restore(storage_policy, backup_destination)
|
||||
assert node.contains_in_log("using native copy")
|
||||
assert node.contains_in_log(
|
||||
f"copyFileS3ToS3: Single operation copy has completed. Bucket: root, Key: data/backups/{backup_name}"
|
||||
f"copyS3File: Single operation copy has completed. Bucket: root, Key: data/backups/{backup_name}"
|
||||
)
|
||||
|
||||
|
||||
@ -153,7 +153,7 @@ def test_backup_to_s3_native_copy_other_bucket():
|
||||
check_backup_and_restore(storage_policy, backup_destination)
|
||||
assert node.contains_in_log("using native copy")
|
||||
assert node.contains_in_log(
|
||||
f"copyFileS3ToS3: Single operation copy has completed. Bucket: root, Key: data/backups/{backup_name}"
|
||||
f"copyS3File: Single operation copy has completed. Bucket: root, Key: data/backups/{backup_name}"
|
||||
)
|
||||
|
||||
|
||||
@ -164,5 +164,5 @@ def test_backup_to_s3_native_copy_multipart():
|
||||
check_backup_and_restore(storage_policy, backup_destination, size=1000000)
|
||||
assert node.contains_in_log("using native copy")
|
||||
assert node.contains_in_log(
|
||||
f"copyFileS3ToS3: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}/"
|
||||
f"copyS3File: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}/"
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user