2019-12-06 14:37:21 +00:00
|
|
|
#include <Common/config.h>
|
|
|
|
|
|
|
|
#if USE_AWS_S3
|
2019-06-01 21:18:20 +00:00
|
|
|
|
2019-12-06 14:37:21 +00:00
|
|
|
#include <IO/WriteBufferFromS3.h>
|
2019-06-17 07:16:43 +00:00
|
|
|
#include <IO/WriteHelpers.h>
|
|
|
|
|
2019-06-01 21:18:20 +00:00
|
|
|
#include <common/logger_useful.h>
|
2019-12-03 16:23:24 +00:00
|
|
|
#include <aws/s3/model/CreateMultipartUploadRequest.h>
|
|
|
|
#include <aws/s3/model/UploadPartRequest.h>
|
|
|
|
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
|
|
|
|
|
|
|
|
#include <utility>
|
2019-06-01 21:18:20 +00:00
|
|
|
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2019-09-23 07:42:02 +00:00
|
|
|
// S3 protocol does not allow to have multipart upload with more than 10000 parts.
|
|
|
|
// In case server does not return an error on exceeding that number, we print a warning
|
|
|
|
// because custom S3 implementation may allow relaxed requirements on that.
|
2019-09-22 10:42:47 +00:00
|
|
|
const int S3_WARN_MAX_PARTS = 10000;
|
|
|
|
|
|
|
|
|
2019-06-17 07:16:43 +00:00
|
|
|
namespace ErrorCodes
|
|
|
|
{
|
2019-12-03 16:23:24 +00:00
|
|
|
extern const int S3_ERROR;
|
2019-06-17 07:16:43 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-06-01 21:18:20 +00:00
|
|
|
WriteBufferFromS3::WriteBufferFromS3(
|
2019-12-03 16:23:24 +00:00
|
|
|
std::shared_ptr<Aws::S3::S3Client> client_ptr_,
|
|
|
|
const String & bucket_,
|
|
|
|
const String & key_,
|
2019-06-21 05:24:01 +00:00
|
|
|
size_t minimum_upload_part_size_,
|
2019-12-03 16:23:24 +00:00
|
|
|
size_t buffer_size_
|
|
|
|
)
|
|
|
|
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
|
|
|
|
, bucket(bucket_)
|
|
|
|
, key(key_)
|
|
|
|
, client_ptr(std::move(client_ptr_))
|
2019-06-21 05:24:01 +00:00
|
|
|
, minimum_upload_part_size {minimum_upload_part_size_}
|
2019-06-17 00:42:47 +00:00
|
|
|
, temporary_buffer {std::make_unique<WriteBufferFromString>(buffer_string)}
|
|
|
|
, last_part_size {0}
|
2019-06-01 21:18:20 +00:00
|
|
|
{
|
2019-06-17 00:42:47 +00:00
|
|
|
initiate();
|
2019-06-01 21:18:20 +00:00
|
|
|
}
|
|
|
|
|
2019-06-17 00:06:14 +00:00
|
|
|
|
|
|
|
void WriteBufferFromS3::nextImpl()
|
|
|
|
{
|
|
|
|
if (!offset())
|
|
|
|
return;
|
|
|
|
|
2019-06-17 00:42:47 +00:00
|
|
|
temporary_buffer->write(working_buffer.begin(), offset());
|
|
|
|
|
|
|
|
last_part_size += offset();
|
|
|
|
|
2019-06-21 05:24:01 +00:00
|
|
|
if (last_part_size > minimum_upload_part_size)
|
2019-06-17 00:42:47 +00:00
|
|
|
{
|
|
|
|
temporary_buffer->finish();
|
|
|
|
writePart(buffer_string);
|
|
|
|
last_part_size = 0;
|
2019-06-17 07:16:43 +00:00
|
|
|
temporary_buffer = std::make_unique<WriteBufferFromString>(buffer_string);
|
2019-06-17 00:42:47 +00:00
|
|
|
}
|
2019-06-17 00:06:14 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
|
2019-06-01 21:18:20 +00:00
|
|
|
void WriteBufferFromS3::finalize()
|
|
|
|
{
|
2019-06-17 00:42:47 +00:00
|
|
|
temporary_buffer->finish();
|
|
|
|
if (!buffer_string.empty())
|
|
|
|
{
|
|
|
|
writePart(buffer_string);
|
|
|
|
}
|
|
|
|
|
|
|
|
complete();
|
|
|
|
}
|
2019-06-17 00:06:14 +00:00
|
|
|
|
2019-06-10 02:35:33 +00:00
|
|
|
|
2019-06-17 00:42:47 +00:00
|
|
|
WriteBufferFromS3::~WriteBufferFromS3()
|
|
|
|
{
|
|
|
|
try
|
|
|
|
{
|
|
|
|
next();
|
|
|
|
}
|
|
|
|
catch (...)
|
|
|
|
{
|
|
|
|
tryLogCurrentException(__PRETTY_FUNCTION__);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
void WriteBufferFromS3::initiate()
|
|
|
|
{
|
2019-12-03 16:23:24 +00:00
|
|
|
Aws::S3::Model::CreateMultipartUploadRequest req;
|
|
|
|
req.SetBucket(bucket);
|
|
|
|
req.SetKey(key);
|
2019-06-17 07:16:43 +00:00
|
|
|
|
2019-12-03 16:23:24 +00:00
|
|
|
auto outcome = client_ptr->CreateMultipartUpload(req);
|
2019-06-17 07:16:43 +00:00
|
|
|
|
2019-12-06 14:48:56 +00:00
|
|
|
if (outcome.IsSuccess())
|
|
|
|
{
|
2019-12-03 16:23:24 +00:00
|
|
|
upload_id = outcome.GetResult().GetUploadId();
|
|
|
|
LOG_DEBUG(log, "Multipart upload initiated. Upload id = " + upload_id);
|
2019-06-17 07:16:43 +00:00
|
|
|
}
|
2019-12-06 14:48:56 +00:00
|
|
|
else
|
|
|
|
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
2019-06-17 00:42:47 +00:00
|
|
|
}
|
2019-06-17 18:06:28 +00:00
|
|
|
|
2019-06-17 00:42:47 +00:00
|
|
|
|
|
|
|
void WriteBufferFromS3::writePart(const String & data)
|
|
|
|
{
|
2019-09-22 10:42:47 +00:00
|
|
|
if (part_tags.size() == S3_WARN_MAX_PARTS)
|
2019-06-22 05:58:05 +00:00
|
|
|
{
|
2019-09-22 10:42:47 +00:00
|
|
|
// Don't throw exception here by ourselves but leave the decision to take by S3 server.
|
2019-12-03 16:23:24 +00:00
|
|
|
LOG_WARNING(log, "Maximum part number in S3 protocol has reached (too many parts). Server may not accept this whole upload.");
|
2019-06-22 05:58:05 +00:00
|
|
|
}
|
|
|
|
|
2019-12-03 16:23:24 +00:00
|
|
|
Aws::S3::Model::UploadPartRequest req;
|
2019-06-10 02:35:33 +00:00
|
|
|
|
2019-12-03 16:23:24 +00:00
|
|
|
req.SetBucket(bucket);
|
|
|
|
req.SetKey(key);
|
|
|
|
req.SetPartNumber(part_tags.size() + 1);
|
|
|
|
req.SetUploadId(upload_id);
|
|
|
|
req.SetContentLength(data.size());
|
|
|
|
req.SetBody(std::make_shared<Aws::StringStream>(data));
|
2019-06-10 02:35:33 +00:00
|
|
|
|
2019-12-03 16:23:24 +00:00
|
|
|
auto outcome = client_ptr->UploadPart(req);
|
2019-06-10 02:35:33 +00:00
|
|
|
|
2019-12-06 14:48:56 +00:00
|
|
|
if (outcome.IsSuccess())
|
|
|
|
{
|
2019-12-03 16:23:24 +00:00
|
|
|
auto etag = outcome.GetResult().GetETag();
|
|
|
|
part_tags.push_back(etag);
|
|
|
|
LOG_DEBUG(log, "Write part " + std::to_string(part_tags.size()) + " finished. Upload id = " + upload_id + ". Etag = " + etag);
|
2019-06-17 07:16:43 +00:00
|
|
|
}
|
2019-12-06 14:48:56 +00:00
|
|
|
else
|
|
|
|
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
2019-06-01 21:18:20 +00:00
|
|
|
}
|
|
|
|
|
2019-06-17 00:06:14 +00:00
|
|
|
|
2019-06-17 00:42:47 +00:00
|
|
|
void WriteBufferFromS3::complete()
|
2019-06-17 00:06:14 +00:00
|
|
|
{
|
2019-12-03 16:23:24 +00:00
|
|
|
Aws::S3::Model::CompleteMultipartUploadRequest req;
|
|
|
|
req.SetBucket(bucket);
|
|
|
|
req.SetKey(key);
|
|
|
|
req.SetUploadId(upload_id);
|
|
|
|
|
|
|
|
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
|
|
|
|
for (size_t i = 0; i < part_tags.size(); i++) {
|
|
|
|
Aws::S3::Model::CompletedPart part;
|
|
|
|
multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1));
|
2019-06-17 07:16:43 +00:00
|
|
|
}
|
|
|
|
|
2019-12-03 16:23:24 +00:00
|
|
|
req.SetMultipartUpload(multipart_upload);
|
2019-06-17 07:16:43 +00:00
|
|
|
|
2019-12-03 16:23:24 +00:00
|
|
|
auto outcome = client_ptr->CompleteMultipartUpload(req);
|
2019-06-17 07:16:43 +00:00
|
|
|
|
2019-12-06 14:48:56 +00:00
|
|
|
if (outcome.IsSuccess())
|
2019-12-03 16:23:24 +00:00
|
|
|
LOG_DEBUG(log, "Multipart upload completed. Upload_id = " + upload_id);
|
2019-12-06 14:48:56 +00:00
|
|
|
else
|
2019-12-03 16:23:24 +00:00
|
|
|
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
|
2019-06-17 00:06:14 +00:00
|
|
|
}
|
|
|
|
|
2019-06-01 21:18:20 +00:00
|
|
|
}
|
2019-12-06 14:37:21 +00:00
|
|
|
|
|
|
|
#endif
|