Split upload into parts of the same size for smooth uploading.

Correctly use AbortMultipleUpload request.
Support std::ios_base::end StdStreamBufFromReadBuffer::seekpos().
This commit is contained in:
Vitaly Baranov 2023-01-16 14:55:26 +01:00
parent 14a7ee8e26
commit 1c845185c1
5 changed files with 97 additions and 52 deletions

View File

@ -34,6 +34,7 @@ namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int INVALID_CONFIG_PARAMETER;
extern const int LOGICAL_ERROR;
}
@ -83,7 +84,9 @@ namespace
std::exception_ptr exception;
};
size_t normal_part_size;
String multipart_upload_id;
std::atomic<bool> multipart_upload_aborted = false;
Strings part_tags;
std::list<UploadPartTask> TSA_GUARDED_BY(bg_tasks_mutex) bg_tasks;
@ -122,6 +125,9 @@ namespace
void completeMultipartUpload()
{
if (multipart_upload_aborted)
return;
LOG_TRACE(log, "Completing multipart upload. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", dest_bucket, dest_key, multipart_upload_id, part_tags.size());
if (part_tags.empty())
@ -172,11 +178,13 @@ namespace
void abortMultipartUpload()
{
LOG_TRACE(log, "Aborting multipart upload. Bucket: {}, Key: {}, Upload_id: {}", dest_bucket, dest_key, multipart_upload_id);
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
abort_request.SetBucket(dest_bucket);
abort_request.SetKey(dest_key);
abort_request.SetUploadId(multipart_upload_id);
client_ptr->AbortMultipartUpload(abort_request);
multipart_upload_aborted = true;
}
void checkObjectAfterUpload()
@ -192,33 +200,82 @@ namespace
void performMultipartUpload(size_t start_offset, size_t size)
{
calculatePartSize(size);
createMultipartUpload();
size_t position = start_offset;
size_t end_position = start_offset + size;
size_t upload_part_size = settings.min_upload_part_size;
for (size_t part_number = 1; position < end_position; ++part_number)
{
size_t next_position = std::min(position + upload_part_size, end_position);
if (multipart_upload_aborted)
break; /// No more part uploads.
uploadPart(part_number, position, next_position - position, size);
size_t next_position = std::min(position + normal_part_size, end_position);
size_t part_size = next_position - position; /// `part_size` is either `normal_part_size` or smaller if it's the final part.
uploadPart(part_number, position, part_size);
position = next_position;
/// Maybe increase `upload_part_size` (we need to increase it sometimes to keep `part_number` less or equal than `max_part_number`).
if (part_number % settings.upload_part_size_multiply_parts_count_threshold == 0)
{
upload_part_size *= settings.upload_part_size_multiply_factor;
upload_part_size = std::min(upload_part_size, settings.max_upload_part_size);
}
}
waitForAllBackGroundTasks();
completeMultipartUpload();
}
void uploadPart(size_t part_number, size_t part_offset, size_t part_size, size_t total_size)
void calculatePartSize(size_t total_size)
{
if (!total_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chosen multipart upload for an empty file. This must not happen");
if (!settings.max_part_number)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_part_number must not be 0");
else if (!settings.min_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "min_upload_part_size must not be 0");
else if (settings.max_upload_part_size < settings.min_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_upload_part_size must not be less than min_upload_part_size");
size_t part_size = settings.min_upload_part_size;
size_t num_parts = (total_size + part_size - 1) / part_size;
if (num_parts > settings.max_part_number)
{
part_size = (total_size + settings.max_part_number - 1) / settings.max_part_number;
num_parts = (total_size + part_size - 1) / part_size;
}
if (part_size > settings.max_upload_part_size)
{
part_size = settings.max_upload_part_size;
num_parts = (total_size + part_size - 1) / part_size;
}
if (num_parts < 1 || num_parts > settings.max_part_number || part_size < settings.min_upload_part_size
|| part_size > settings.max_upload_part_size)
{
String msg;
if (num_parts < 1)
msg = "Number of parts is zero";
else if (num_parts > settings.max_part_number)
msg = fmt::format("Number of parts exceeds {}", num_parts, settings.max_part_number);
else if (part_size < settings.min_upload_part_size)
msg = fmt::format("Size of a part is less than {}", part_size, settings.min_upload_part_size);
else
msg = fmt::format("Size of a part exceeds {}", part_size, settings.max_upload_part_size);
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"{} while writing {} bytes to S3. Check max_part_number = {}, "
"min_upload_part_size = {}, max_upload_part_size = {}, max_single_part_upload_size = {}",
msg, total_size, settings.max_part_number, settings.min_upload_part_size,
settings.max_upload_part_size, settings.max_single_part_upload_size);
}
/// We've calculated the size of a normal part (the final part can be smaller).
normal_part_size = part_size;
}
void uploadPart(size_t part_number, size_t part_offset, size_t part_size)
{
LOG_TRACE(log, "Writing part. Bucket: {}, Key: {}, Upload_id: {}, Size: {}", dest_bucket, dest_key, multipart_upload_id, part_size);
@ -228,17 +285,6 @@ namespace
return;
}
if (part_number > settings.max_part_number)
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"Part number exceeded {} while writing {} bytes to S3. Check min_upload_part_size = {}, max_upload_part_size = {}, "
"upload_part_size_multiply_factor = {}, upload_part_size_multiply_parts_count_threshold = {}, max_single_part_upload_size = {}",
settings.max_part_number, total_size, settings.min_upload_part_size, settings.max_upload_part_size,
settings.upload_part_size_multiply_factor, settings.upload_part_size_multiply_parts_count_threshold,
settings.max_single_part_upload_size);
}
if (schedule)
{
UploadPartTask * task = nullptr;
@ -319,7 +365,15 @@ namespace
for (auto & task : tasks)
{
if (task.exception)
{
/// abortMultipartUpload() might be called already, see processUploadPartRequest().
/// However if there were concurrent uploads at that time, those part uploads might or might not succeed.
/// As a result, it might be necessary to abort a given multipart upload multiple times in order to completely free
/// all storage consumed by all parts.
abortMultipartUpload();
std::rethrow_exception(task.exception);
}
part_tags.push_back(task.tag);
}
@ -377,7 +431,7 @@ namespace
request.SetBucket(dest_bucket);
request.SetKey(dest_key);
request.SetContentLength(size);
request.SetBody(std::make_unique<StdStreamFromReadBuffer>(std::move(read_buffer)));
request.SetBody(std::make_unique<StdStreamFromReadBuffer>(std::move(read_buffer), size));
if (object_metadata.has_value())
request.SetMetadata(object_metadata.value());
@ -457,7 +511,7 @@ namespace
request->SetPartNumber(static_cast<int>(part_number));
request->SetUploadId(multipart_upload_id);
request->SetContentLength(part_size);
request->SetBody(std::make_unique<StdStreamFromReadBuffer>(std::move(read_buffer)));
request->SetBody(std::make_unique<StdStreamFromReadBuffer>(std::move(read_buffer), part_size));
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
request->SetContentType("binary/octet-stream");
@ -507,9 +561,6 @@ namespace
void performCopy()
{
if (size == static_cast<size_t>(-1))
size = getSourceSize() - offset;
if (size <= settings.max_single_operation_copy_size)
performSingleOperationCopy();
else
@ -635,12 +686,6 @@ namespace
return outcome.GetResult().GetCopyPartResult().GetETag();
}
size_t getSourceSize()
{
auto head = S3::headObject(*client_ptr, src_bucket, src_key).GetResult();
return head.GetContentLength();
}
};
}

View File

@ -35,8 +35,7 @@ void copyDataToS3(
/// Copies a file from S3 to S3.
/// The same functionality can be done by using the function copyData() and the classes ReadBufferFromS3 and WriteBufferFromS3
/// however copyFileS3ToS3() is faster and spends less network traffic and memory.
/// The parameters `src_offset` and `src_size` specify a part in the source to copy;
/// if `src_offset == 0` and `src_size == -1` that means the entire source file will be copied.
/// The parameters `src_offset` and `src_size` specify a part in the source to copy.
void copyFileS3ToS3(
const std::shared_ptr<const Aws::S3::S3Client> & s3_client,
const String & src_bucket,

View File

@ -12,13 +12,12 @@ namespace ErrorCodes
}
StdStreamBufFromReadBuffer::StdStreamBufFromReadBuffer(std::unique_ptr<ReadBuffer> read_buffer_)
StdStreamBufFromReadBuffer::StdStreamBufFromReadBuffer(std::unique_ptr<ReadBuffer> read_buffer_, size_t size_)
: read_buffer(std::move(read_buffer_)), seekable_read_buffer(dynamic_cast<SeekableReadBuffer *>(read_buffer.get())), size(size_)
{
read_buffer = std::move(read_buffer_);
seekable_read_buffer = dynamic_cast<SeekableReadBuffer *>(read_buffer.get());
}
StdStreamBufFromReadBuffer::StdStreamBufFromReadBuffer(ReadBuffer & read_buffer_)
StdStreamBufFromReadBuffer::StdStreamBufFromReadBuffer(ReadBuffer & read_buffer_, size_t size_) : size(size_)
{
if (dynamic_cast<SeekableReadBuffer *>(&read_buffer_))
{
@ -55,11 +54,12 @@ std::streampos StdStreamBufFromReadBuffer::seekoff(std::streamoff off, std::ios_
{
if (dir == std::ios_base::beg)
return seekpos(off, which);
if (dir == std::ios_base::cur)
else if (dir == std::ios_base::cur)
return seekpos(getCurrentPosition() + off, which);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong seek's base {}", static_cast<int>(dir));
else if (dir == std::ios_base::end)
return seekpos(size, which);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong seek's base {}", static_cast<int>(dir));
}
std::streampos StdStreamBufFromReadBuffer::seekpos(std::streampos pos, std::ios_base::openmode which)

View File

@ -15,24 +15,25 @@ class StdStreamBufFromReadBuffer : public std::streambuf
public:
using Base = std::streambuf;
explicit StdStreamBufFromReadBuffer(std::unique_ptr<ReadBuffer> read_buffer_);
explicit StdStreamBufFromReadBuffer(ReadBuffer & read_buffer_);
explicit StdStreamBufFromReadBuffer(std::unique_ptr<ReadBuffer> read_buffer_, size_t size_);
explicit StdStreamBufFromReadBuffer(ReadBuffer & read_buffer_, size_t size_);
~StdStreamBufFromReadBuffer() override;
private:
int underflow() override;
std::streamsize showmanyc() override;
std::streamsize xsgetn(char* s, std::streamsize count) override;
std::streampos seekoff(std::streamoff off, std::ios_base::seekdir dir, std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) override;
std::streampos seekpos(std::streampos pos, std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) override;
std::streampos seekoff(std::streamoff off, std::ios_base::seekdir dir, std::ios_base::openmode which) override;
std::streampos seekpos(std::streampos pos, std::ios_base::openmode which) override;
std::streamsize xsputn(const char* s, std::streamsize n) override;
int overflow(int c = std::char_traits<char>::eof()) override;
int overflow(int c) override;
std::streampos getCurrentPosition() const;
std::unique_ptr<ReadBuffer> read_buffer;
SeekableReadBuffer * seekable_read_buffer = nullptr;
size_t size;
};
}

View File

@ -14,8 +14,8 @@ class StdIStreamFromReadBuffer : public std::istream
{
public:
using Base = std::istream;
StdIStreamFromReadBuffer(std::unique_ptr<ReadBuffer> buf) : Base(&stream_buf), stream_buf(std::move(buf)) { }
StdIStreamFromReadBuffer(ReadBuffer & buf) : Base(&stream_buf), stream_buf(buf) { }
StdIStreamFromReadBuffer(std::unique_ptr<ReadBuffer> buf, size_t size) : Base(&stream_buf), stream_buf(std::move(buf), size) { }
StdIStreamFromReadBuffer(ReadBuffer & buf, size_t size) : Base(&stream_buf), stream_buf(buf, size) { }
StdStreamBufFromReadBuffer * rdbuf() const { return const_cast<StdStreamBufFromReadBuffer *>(&stream_buf); }
private:
@ -28,8 +28,8 @@ class StdStreamFromReadBuffer : public std::iostream
{
public:
using Base = std::iostream;
StdStreamFromReadBuffer(std::unique_ptr<ReadBuffer> buf) : Base(&stream_buf), stream_buf(std::move(buf)) { }
StdStreamFromReadBuffer(ReadBuffer & buf) : Base(&stream_buf), stream_buf(buf) { }
StdStreamFromReadBuffer(std::unique_ptr<ReadBuffer> buf, size_t size) : Base(&stream_buf), stream_buf(std::move(buf), size) { }
StdStreamFromReadBuffer(ReadBuffer & buf, size_t size) : Base(&stream_buf), stream_buf(buf, size) { }
StdStreamBufFromReadBuffer * rdbuf() const { return const_cast<StdStreamBufFromReadBuffer *>(&stream_buf); }
private: