mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-21 15:12:02 +00:00
Merge pull request #27858 from ianton-ru/MDB-14264
Use Multipart copy upload for large S3 objects
This commit is contained in:
commit
bce6d092ea
10
base/common/unit.h
Normal file
10
base/common/unit.h
Normal file
@ -0,0 +1,10 @@
|
||||
#pragma once
|
||||
#include <cstddef>
|
||||
|
||||
constexpr size_t KiB = 1024;
|
||||
constexpr size_t MiB = 1024 * KiB;
|
||||
constexpr size_t GiB = 1024 * MiB;
|
||||
|
||||
constexpr size_t operator"" _KiB(unsigned long long val) { return val * KiB; }
|
||||
constexpr size_t operator"" _MiB(unsigned long long val) { return val * MiB; }
|
||||
constexpr size_t operator"" _GiB(unsigned long long val) { return val * GiB; }
|
@ -6,25 +6,37 @@
|
||||
#include <bitset>
|
||||
#include <random>
|
||||
#include <utility>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
#include <common/unit.h>
|
||||
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <Common/createHardLink.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
|
||||
#include <Disks/ReadIndirectBufferFromRemoteFS.h>
|
||||
#include <Disks/WriteIndirectBufferFromRemoteFS.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
#include <IO/ReadBufferFromS3.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/SeekAvoidingReadBuffer.h>
|
||||
#include <IO/WriteBufferFromS3.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Common/createHardLink.h>
|
||||
#include <Common/quoteString.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <Common/checkStackSize.h>
|
||||
#include <boost/algorithm/string.hpp>
|
||||
|
||||
#include <aws/s3/model/CopyObjectRequest.h> // Y_IGNORE
|
||||
#include <aws/s3/model/DeleteObjectsRequest.h> // Y_IGNORE
|
||||
#include <aws/s3/model/GetObjectRequest.h> // Y_IGNORE
|
||||
#include <aws/s3/model/ListObjectsV2Request.h> // Y_IGNORE
|
||||
#include <aws/s3/model/HeadObjectRequest.h> // Y_IGNORE
|
||||
#include <aws/s3/model/CreateMultipartUploadRequest.h> // Y_IGNORE
|
||||
#include <aws/s3/model/CompleteMultipartUploadRequest.h> // Y_IGNORE
|
||||
#include <aws/s3/model/UploadPartCopyRequest.h> // Y_IGNORE
|
||||
#include <aws/s3/model/AbortMultipartUploadRequest.h> // Y_IGNORE
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -388,16 +400,7 @@ void DiskS3::saveSchemaVersion(const int & version)
|
||||
|
||||
void DiskS3::updateObjectMetadata(const String & key, const ObjectMetadata & metadata)
|
||||
{
|
||||
auto settings = current_settings.get();
|
||||
Aws::S3::Model::CopyObjectRequest request;
|
||||
request.SetCopySource(bucket + "/" + key);
|
||||
request.SetBucket(bucket);
|
||||
request.SetKey(key);
|
||||
request.SetMetadata(metadata);
|
||||
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
|
||||
|
||||
auto outcome = settings->client->CopyObject(request);
|
||||
throwIfError(outcome);
|
||||
copyObjectImpl(bucket, key, bucket, key, std::nullopt, metadata);
|
||||
}
|
||||
|
||||
void DiskS3::migrateFileToRestorableSchema(const String & path)
|
||||
@ -553,18 +556,124 @@ void DiskS3::listObjects(const String & source_bucket, const String & source_pat
|
||||
} while (outcome.GetResult().GetIsTruncated());
|
||||
}
|
||||
|
||||
void DiskS3::copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key) const
|
||||
void DiskS3::copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||
std::optional<Aws::S3::Model::HeadObjectResult> head) const
|
||||
{
|
||||
if (head && (head->GetContentLength() >= static_cast<Int64>(5_GiB)))
|
||||
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, head);
|
||||
else
|
||||
copyObjectImpl(src_bucket, src_key, dst_bucket, dst_key);
|
||||
}
|
||||
|
||||
void DiskS3::copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||
std::optional<Aws::S3::Model::HeadObjectResult> head,
|
||||
std::optional<std::reference_wrapper<const ObjectMetadata>> metadata) const
|
||||
{
|
||||
auto settings = current_settings.get();
|
||||
Aws::S3::Model::CopyObjectRequest request;
|
||||
request.SetCopySource(src_bucket + "/" + src_key);
|
||||
request.SetBucket(dst_bucket);
|
||||
request.SetKey(dst_key);
|
||||
if (metadata)
|
||||
{
|
||||
request.SetMetadata(*metadata);
|
||||
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
|
||||
}
|
||||
|
||||
auto outcome = settings->client->CopyObject(request);
|
||||
|
||||
if (!outcome.IsSuccess() && outcome.GetError().GetExceptionName() == "EntityTooLarge")
|
||||
{ // Can't come here with MinIO, MinIO allows single part upload for large objects.
|
||||
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, head, metadata);
|
||||
return;
|
||||
}
|
||||
|
||||
throwIfError(outcome);
|
||||
}
|
||||
|
||||
void DiskS3::copyObjectMultipartImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||
std::optional<Aws::S3::Model::HeadObjectResult> head,
|
||||
std::optional<std::reference_wrapper<const ObjectMetadata>> metadata) const
|
||||
{
|
||||
LOG_DEBUG(log, "Multipart copy upload has created. Src Bucket: {}, Src Key: {}, Dst Bucket: {}, Dst Key: {}, Metadata: {}",
|
||||
src_bucket, src_key, dst_bucket, dst_key, metadata ? "REPLACE" : "NOT_SET");
|
||||
|
||||
auto settings = current_settings.get();
|
||||
|
||||
if (!head)
|
||||
head = headObject(src_bucket, src_key);
|
||||
|
||||
size_t size = head->GetContentLength();
|
||||
|
||||
String multipart_upload_id;
|
||||
|
||||
{
|
||||
Aws::S3::Model::CreateMultipartUploadRequest request;
|
||||
request.SetBucket(dst_bucket);
|
||||
request.SetKey(dst_key);
|
||||
if (metadata)
|
||||
request.SetMetadata(*metadata);
|
||||
|
||||
auto outcome = settings->client->CreateMultipartUpload(request);
|
||||
|
||||
throwIfError(outcome);
|
||||
|
||||
multipart_upload_id = outcome.GetResult().GetUploadId();
|
||||
}
|
||||
|
||||
std::vector<String> part_tags;
|
||||
|
||||
size_t upload_part_size = settings->s3_min_upload_part_size;
|
||||
for (size_t position = 0, part_number = 1; position < size; ++part_number, position += upload_part_size)
|
||||
{
|
||||
Aws::S3::Model::UploadPartCopyRequest part_request;
|
||||
part_request.SetCopySource(src_bucket + "/" + src_key);
|
||||
part_request.SetBucket(dst_bucket);
|
||||
part_request.SetKey(dst_key);
|
||||
part_request.SetUploadId(multipart_upload_id);
|
||||
part_request.SetPartNumber(part_number);
|
||||
part_request.SetCopySourceRange(fmt::format("bytes={}-{}", position, std::min(size, position + upload_part_size) - 1));
|
||||
|
||||
auto outcome = settings->client->UploadPartCopy(part_request);
|
||||
if (!outcome.IsSuccess())
|
||||
{
|
||||
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
|
||||
abort_request.SetBucket(dst_bucket);
|
||||
abort_request.SetKey(dst_key);
|
||||
abort_request.SetUploadId(multipart_upload_id);
|
||||
settings->client->AbortMultipartUpload(abort_request);
|
||||
// In error case we throw exception later with first error from UploadPartCopy
|
||||
}
|
||||
throwIfError(outcome);
|
||||
|
||||
auto etag = outcome.GetResult().GetCopyPartResult().GetETag();
|
||||
part_tags.push_back(etag);
|
||||
}
|
||||
|
||||
{
|
||||
Aws::S3::Model::CompleteMultipartUploadRequest req;
|
||||
req.SetBucket(dst_bucket);
|
||||
req.SetKey(dst_key);
|
||||
req.SetUploadId(multipart_upload_id);
|
||||
|
||||
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
|
||||
for (size_t i = 0; i < part_tags.size(); ++i)
|
||||
{
|
||||
Aws::S3::Model::CompletedPart part;
|
||||
multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1));
|
||||
}
|
||||
|
||||
req.SetMultipartUpload(multipart_upload);
|
||||
|
||||
auto outcome = settings->client->CompleteMultipartUpload(req);
|
||||
|
||||
throwIfError(outcome);
|
||||
|
||||
LOG_DEBUG(log, "Multipart copy upload has completed. Src Bucket: {}, Src Key: {}, Dst Bucket: {}, Dst Key: {}, "
|
||||
"Upload_id: {}, Parts: {}", src_bucket, src_key, dst_bucket, dst_key, multipart_upload_id, part_tags.size());
|
||||
}
|
||||
}
|
||||
|
||||
struct DiskS3::RestoreInformation
|
||||
{
|
||||
UInt64 revision = LATEST_REVISION;
|
||||
@ -757,7 +866,7 @@ void DiskS3::processRestoreFiles(const String & source_bucket, const String & so
|
||||
|
||||
/// Copy object if we restore to different bucket / path.
|
||||
if (bucket != source_bucket || remote_fs_root_path != source_path)
|
||||
copyObject(source_bucket, key, bucket, remote_fs_root_path + relative_key);
|
||||
copyObject(source_bucket, key, bucket, remote_fs_root_path + relative_key, head_result);
|
||||
|
||||
metadata.addObject(relative_key, head_result.GetContentLength());
|
||||
metadata.save();
|
||||
|
@ -7,6 +7,7 @@
|
||||
#if USE_AWS_S3
|
||||
|
||||
#include <atomic>
|
||||
#include <optional>
|
||||
#include <common/logger_useful.h>
|
||||
#include "Disks/DiskFactory.h"
|
||||
#include "Disks/Executor.h"
|
||||
@ -131,7 +132,15 @@ private:
|
||||
|
||||
Aws::S3::Model::HeadObjectResult headObject(const String & source_bucket, const String & key) const;
|
||||
void listObjects(const String & source_bucket, const String & source_path, std::function<bool(const Aws::S3::Model::ListObjectsV2Result &)> callback) const;
|
||||
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key) const;
|
||||
void copyObject(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt) const;
|
||||
|
||||
void copyObjectImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
|
||||
std::optional<std::reference_wrapper<const ObjectMetadata>> metadata = std::nullopt) const;
|
||||
void copyObjectMultipartImpl(const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key,
|
||||
std::optional<Aws::S3::Model::HeadObjectResult> head = std::nullopt,
|
||||
std::optional<std::reference_wrapper<const ObjectMetadata>> metadata = std::nullopt) const;
|
||||
|
||||
/// Restore S3 metadata files on file system.
|
||||
void restore();
|
||||
|
Loading…
Reference in New Issue
Block a user