Merge pull request #45188 from vitlibar/backup-to-s3-memory-optimization

Optimize memory consumption during backup to S3
This commit is contained in:
Vitaly Baranov 2023-01-21 12:37:35 +01:00 committed by GitHub
commit f0fda580d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 1262 additions and 365 deletions

View File

@ -19,7 +19,7 @@ BackupEntryFromAppendOnlyFile::BackupEntryFromAppendOnlyFile(
std::unique_ptr<SeekableReadBuffer> BackupEntryFromAppendOnlyFile::getReadBuffer() const
{
auto buf = BackupEntryFromImmutableFile::getReadBuffer();
return std::make_unique<LimitSeekableReadBuffer>(std::move(buf), limit);
return std::make_unique<LimitSeekableReadBuffer>(std::move(buf), 0, limit);
}
}

View File

@ -12,16 +12,19 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
void IBackupWriter::copyFileThroughBuffer(std::unique_ptr<SeekableReadBuffer> && source, const String & file_name)
void IBackupWriter::copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name)
{
auto write_buffer = writeFile(file_name);
copyData(*source, *write_buffer);
auto read_buffer = create_read_buffer();
if (offset)
read_buffer->seek(offset, SEEK_SET);
auto write_buffer = writeFile(dest_file_name);
copyData(*read_buffer, *write_buffer, size);
write_buffer->finalize();
}
void IBackupWriter::copyFileNative(DiskPtr /* from_disk */, const String & /* file_name_from */, const String & /* file_name_to */)
void IBackupWriter::copyFileNative(
DiskPtr /* src_disk */, const String & /* src_file_name */, UInt64 /* src_offset */, UInt64 /* src_size */, const String & /* dest_file_name */)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Native copy not implemented for backup writer");
}
}

View File

@ -24,6 +24,8 @@ public:
class IBackupWriter /// BackupWriterFile, BackupWriterDisk
{
public:
using CreateReadBufferFunction = std::function<std::unique_ptr<SeekableReadBuffer>()>;
virtual ~IBackupWriter() = default;
virtual bool fileExists(const String & file_name) = 0;
virtual UInt64 getFileSize(const String & file_name) = 0;
@ -32,14 +34,9 @@ public:
virtual void removeFile(const String & file_name) = 0;
virtual void removeFiles(const Strings & file_names) = 0;
virtual DataSourceDescription getDataSourceDescription() const = 0;
virtual void copyFileThroughBuffer(std::unique_ptr<SeekableReadBuffer> && source, const String & file_name);
virtual bool supportNativeCopy(DataSourceDescription /* data_source_description */) const
{
return false;
}
virtual void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to);
virtual void copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name);
virtual bool supportNativeCopy(DataSourceDescription /* data_source_description */) const { return false; }
virtual void copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name);
};
}

View File

@ -105,13 +105,21 @@ bool BackupWriterDisk::supportNativeCopy(DataSourceDescription data_source_descr
return data_source_description == disk->getDataSourceDescription();
}
void BackupWriterDisk::copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to)
void BackupWriterDisk::copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name)
{
if (!from_disk)
if (!src_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot natively copy data to disk without source disk");
auto file_path = path / file_name_to;
if ((src_offset != 0) || (src_size != src_disk->getFileSize(src_file_name)))
{
auto create_read_buffer = [src_disk, src_file_name] { return src_disk->readFile(src_file_name); };
copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name);
return;
}
auto file_path = path / dest_file_name;
disk->createDirectories(file_path.parent_path());
from_disk->copyFile(file_name_from, *disk, file_path);
src_disk->copyFile(src_file_name, *disk, file_path);
}
}

View File

@ -39,8 +39,8 @@ public:
DataSourceDescription getDataSourceDescription() const override;
bool supportNativeCopy(DataSourceDescription data_source_description) const override;
void copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name) override;
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
private:
DiskPtr disk;
std::filesystem::path path;

View File

@ -125,17 +125,24 @@ bool BackupWriterFile::supportNativeCopy(DataSourceDescription data_source_descr
return data_source_description == getDataSourceDescription();
}
void BackupWriterFile::copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to)
void BackupWriterFile::copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name)
{
auto file_path = path / file_name_to;
fs::create_directories(file_path.parent_path());
std::string abs_source_path;
if (from_disk)
abs_source_path = fullPath(from_disk, file_name_from);
if (src_disk)
abs_source_path = fullPath(src_disk, src_file_name);
else
abs_source_path = fs::absolute(file_name_from);
abs_source_path = fs::absolute(src_file_name);
fs::copy(abs_source_path, file_path, fs::copy_options::recursive | fs::copy_options::overwrite_existing);
if ((src_offset != 0) || (src_size != fs::file_size(abs_source_path)))
{
auto create_read_buffer = [abs_source_path] { return createReadBufferFromFileBase(abs_source_path, {}); };
copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name);
return;
}
auto file_path = path / dest_file_name;
fs::create_directories(file_path.parent_path());
fs::copy(abs_source_path, file_path, fs::copy_options::overwrite_existing);
}
}

View File

@ -35,8 +35,7 @@ public:
void removeFiles(const Strings & file_names) override;
DataSourceDescription getDataSourceDescription() const override;
bool supportNativeCopy(DataSourceDescription data_source_description) const override;
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
void copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name) override;
private:
std::filesystem::path path;

View File

@ -4,17 +4,19 @@
#include <Common/quoteString.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <Interpreters/Context.h>
#include <Storages/StorageS3Settings.h>
#include <IO/IOThreadPool.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/HTTPHeaderEntries.h>
#include <IO/S3/copyDataToS3.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <aws/core/auth/AWSCredentials.h>
#include <aws/s3/S3Client.h>
#include <filesystem>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/ListObjectsRequest.h>
#include <filesystem>
namespace fs = std::filesystem;
@ -24,7 +26,6 @@ namespace DB
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int INVALID_CONFIG_PARAMETER;
extern const int LOGICAL_ERROR;
}
@ -150,177 +151,33 @@ bool BackupWriterS3::supportNativeCopy(DataSourceDescription data_source_descrip
return getDataSourceDescription() == data_source_description;
}
void BackupWriterS3::copyObjectImpl(
const String & src_bucket,
const String & src_key,
const String & dst_bucket,
const String & dst_key,
size_t size,
const std::optional<ObjectAttributes> & metadata) const
void BackupWriterS3::copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name)
{
LOG_TRACE(log, "Copying {} bytes using single-operation copy", size);
Aws::S3::Model::CopyObjectRequest request;
request.SetCopySource(src_bucket + "/" + src_key);
request.SetBucket(dst_bucket);
request.SetKey(dst_key);
if (metadata)
{
request.SetMetadata(*metadata);
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
}
auto outcome = client->CopyObject(request);
if (!outcome.IsSuccess() && (outcome.GetError().GetExceptionName() == "EntityTooLarge"
|| outcome.GetError().GetExceptionName() == "InvalidRequest"))
{ // Can't come here with MinIO, MinIO allows single part upload for large objects.
copyObjectMultipartImpl(src_bucket, src_key, dst_bucket, dst_key, size, metadata);
return;
}
if (!outcome.IsSuccess())
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
void BackupWriterS3::copyObjectMultipartImpl(
const String & src_bucket,
const String & src_key,
const String & dst_bucket,
const String & dst_key,
size_t size,
const std::optional<ObjectAttributes> & metadata) const
{
LOG_TRACE(log, "Copying {} bytes using multipart upload copy", size);
String multipart_upload_id;
{
Aws::S3::Model::CreateMultipartUploadRequest request;
request.SetBucket(dst_bucket);
request.SetKey(dst_key);
if (metadata)
request.SetMetadata(*metadata);
auto outcome = client->CreateMultipartUpload(request);
if (!outcome.IsSuccess())
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
multipart_upload_id = outcome.GetResult().GetUploadId();
}
std::vector<String> part_tags;
size_t position = 0;
const auto & settings = request_settings.getUploadSettings();
size_t upload_part_size = settings.min_upload_part_size;
for (size_t part_number = 1; position < size; ++part_number)
{
/// Check that part number is not too big.
if (part_number > settings.max_part_number)
{
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"Part number exceeded {} while writing {} bytes to S3. Check min_upload_part_size = {}, max_upload_part_size = {}, "
"upload_part_size_multiply_factor = {}, upload_part_size_multiply_parts_count_threshold = {}, max_single_operation_copy_size = {}",
settings.max_part_number, size, settings.min_upload_part_size, settings.max_upload_part_size,
settings.upload_part_size_multiply_factor, settings.upload_part_size_multiply_parts_count_threshold,
settings.max_single_operation_copy_size);
}
size_t next_position = std::min(position + upload_part_size, size);
/// Make a copy request to copy a part.
Aws::S3::Model::UploadPartCopyRequest part_request;
part_request.SetCopySource(src_bucket + "/" + src_key);
part_request.SetBucket(dst_bucket);
part_request.SetKey(dst_key);
part_request.SetUploadId(multipart_upload_id);
part_request.SetPartNumber(static_cast<int>(part_number));
part_request.SetCopySourceRange(fmt::format("bytes={}-{}", position, next_position - 1));
auto outcome = client->UploadPartCopy(part_request);
if (!outcome.IsSuccess())
{
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
abort_request.SetBucket(dst_bucket);
abort_request.SetKey(dst_key);
abort_request.SetUploadId(multipart_upload_id);
client->AbortMultipartUpload(abort_request);
// In error case we throw exception later with first error from UploadPartCopy
}
if (!outcome.IsSuccess())
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
auto etag = outcome.GetResult().GetCopyPartResult().GetETag();
part_tags.push_back(etag);
position = next_position;
/// Maybe increase `upload_part_size` (we need to increase it sometimes to keep `part_number` less or equal than `max_part_number`).
if (part_number % settings.upload_part_size_multiply_parts_count_threshold == 0)
{
upload_part_size *= settings.upload_part_size_multiply_factor;
upload_part_size = std::min(upload_part_size, settings.max_upload_part_size);
}
}
{
Aws::S3::Model::CompleteMultipartUploadRequest req;
req.SetBucket(dst_bucket);
req.SetKey(dst_key);
req.SetUploadId(multipart_upload_id);
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
for (size_t i = 0; i < part_tags.size(); ++i)
{
Aws::S3::Model::CompletedPart part;
multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(static_cast<int>(i) + 1));
}
req.SetMultipartUpload(multipart_upload);
auto outcome = client->CompleteMultipartUpload(req);
if (!outcome.IsSuccess())
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
}
void BackupWriterS3::copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to)
{
if (!from_disk)
if (!src_disk)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot natively copy data to disk without source disk");
auto objects = from_disk->getStorageObjects(file_name_from);
auto objects = src_disk->getStorageObjects(src_file_name);
if (objects.size() > 1)
{
copyFileThroughBuffer(from_disk->readFile(file_name_from), file_name_to);
auto create_read_buffer = [src_disk, src_file_name] { return src_disk->readFile(src_file_name); };
copyDataToFile(create_read_buffer, src_offset, src_size, dest_file_name);
}
else
{
auto object_storage = from_disk->getObjectStorage();
std::string source_bucket = object_storage->getObjectsNamespace();
auto file_path = fs::path(s3_uri.key) / file_name_to;
auto size = S3::getObjectSize(*client, source_bucket, objects[0].absolute_path);
if (size < request_settings.getUploadSettings().max_single_operation_copy_size)
{
copyObjectImpl(
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, size);
}
else
{
copyObjectMultipartImpl(
source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, size);
}
auto object_storage = src_disk->getObjectStorage();
std::string src_bucket = object_storage->getObjectsNamespace();
auto file_path = fs::path(s3_uri.key) / dest_file_name;
copyFileS3ToS3(client, src_bucket, objects[0].absolute_path, src_offset, src_size, s3_uri.bucket, file_path, request_settings, {},
threadPoolCallbackRunner<void>(IOThreadPool::get(), "BackupWriterS3"));
}
}
void BackupWriterS3::copyDataToFile(
const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name)
{
copyDataToS3(create_read_buffer, offset, size, client, s3_uri.bucket, fs::path(s3_uri.key) / dest_file_name, request_settings, {},
threadPoolCallbackRunner<void>(IOThreadPool::get(), "BackupWriterS3"));
}
BackupWriterS3::~BackupWriterS3() = default;

View File

@ -4,22 +4,11 @@
#if USE_AWS_S3
#include <Backups/BackupIO.h>
#include <IO/S3Common.h>
#include <IO/ReadSettings.h>
#include <IO/S3Common.h>
#include <Storages/StorageS3Settings.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/HeadObjectRequest.h>
#include <aws/s3/model/DeleteObjectRequest.h>
#include <aws/s3/model/DeleteObjectsRequest.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
#include <aws/s3/model/UploadPartCopyRequest.h>
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <aws/s3/model/HeadObjectResult.h>
#include <aws/s3/model/ListObjectsV2Result.h>
namespace DB
{
@ -54,12 +43,15 @@ public:
UInt64 getFileSize(const String & file_name) override;
bool fileContentsEqual(const String & file_name, const String & expected_file_contents) override;
std::unique_ptr<WriteBuffer> writeFile(const String & file_name) override;
void copyDataToFile(const CreateReadBufferFunction & create_read_buffer, UInt64 offset, UInt64 size, const String & dest_file_name) override;
void removeFile(const String & file_name) override;
void removeFiles(const Strings & file_names) override;
DataSourceDescription getDataSourceDescription() const override;
bool supportNativeCopy(DataSourceDescription data_source_description) const override;
void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override;
void copyFileNative(DiskPtr src_disk, const String & src_file_name, UInt64 src_offset, UInt64 src_size, const String & dest_file_name) override;
private:
void copyObjectImpl(

View File

@ -874,23 +874,18 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
/// We need to copy whole file without archive, we can do it faster
/// if source and destination are compatible
if (!use_archives && info.base_size == 0 && writer->supportNativeCopy(reader_description))
if (!use_archives && writer->supportNativeCopy(reader_description))
{
/// Should be much faster than writing data through server.
LOG_TRACE(log, "Will copy file {} using native copy", adjusted_path);
/// NOTE: `mutex` must be unlocked here otherwise writing will be in one thread maximum and hence slow.
writer->copyFileNative(entry->tryGetDiskIfExists(), entry->getFilePath(), info.data_file_name);
writer->copyFileNative(entry->tryGetDiskIfExists(), entry->getFilePath(), info.base_size, info.size - info.base_size, info.data_file_name);
}
else
{
LOG_TRACE(log, "Will copy file {} through memory buffers", adjusted_path);
auto read_buffer = entry->getReadBuffer();
/// If we have prefix in base we will seek to the start of the suffix which differs
if (info.base_size != 0)
read_buffer->seek(info.base_size, SEEK_SET);
LOG_TRACE(log, "Will copy file {}", adjusted_path);
if (!num_files_written)
checkLockFile(true);
@ -919,13 +914,18 @@ void BackupImpl::writeFile(const String & file_name, BackupEntryPtr entry)
coordination->updateFileInfo(info);
}
auto out = getArchiveWriter(current_archive_suffix)->writeFile(info.data_file_name);
auto read_buffer = entry->getReadBuffer();
if (info.base_size != 0)
read_buffer->seek(info.base_size, SEEK_SET);
copyData(*read_buffer, *out);
out->finalize();
}
else
{
auto create_read_buffer = [entry] { return entry->getReadBuffer(); };
/// NOTE: `mutex` must be unlocked here otherwise writing will be in one thread maximum and hence slow.
writer->copyFileThroughBuffer(std::move(read_buffer), info.data_file_name);
writer->copyDataToFile(create_read_buffer, info.base_size, info.size - info.base_size, info.data_file_name);
}
}

View File

@ -7,42 +7,71 @@ namespace DB
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int LIMIT_EXCEEDED;
}
LimitSeekableReadBuffer::LimitSeekableReadBuffer(SeekableReadBuffer & in_, UInt64 start_offset_, UInt64 limit_size_)
: LimitSeekableReadBuffer(wrapSeekableReadBufferReference(in_), start_offset_, limit_size_)
{
}
LimitSeekableReadBuffer::LimitSeekableReadBuffer(std::unique_ptr<SeekableReadBuffer> in_, UInt64 start_offset_, UInt64 limit_size_)
: SeekableReadBuffer(in_->position(), 0)
, in(std::move(in_))
, min_offset(start_offset_)
, max_offset(start_offset_ + limit_size_)
, need_seek(start_offset_)
{
}
bool LimitSeekableReadBuffer::nextImpl()
{
if (end_position >= static_cast<off_t>(limit))
/// First let the nested buffer know the current position in the buffer (otherwise `in->eof()` or `in->seek()` below can work incorrectly).
in->position() = position();
if (need_seek)
{
/// Do actual seek.
if (in->getPosition() != *need_seek)
{
if (in->seek(*need_seek, SEEK_SET) != static_cast<off_t>(*need_seek))
{
/// Failed to seek, maybe because the new seek position is located after EOF.
set(in->position(), 0);
return false;
}
}
need_seek.reset();
}
if (in->getPosition() >= max_offset)
{
/// Limit reached.
set(in->position(), 0);
return false;
}
assert(position() >= in->position());
in->position() = position();
if (!in->next())
if (in->eof())
{
/// EOF reached.
set(in->position(), 0);
return false;
}
working_buffer = in->buffer();
pos = in->position();
end_position = in->getPosition() + in->available();
/// Adjust the size of the buffer (we don't allow to read more than `max_offset - min_offset`).
off_t size = in->buffer().size();
size = std::min(size, max_offset - in->getPosition());
if (end_position > static_cast<off_t>(limit))
if (!size || (static_cast<off_t>(in->offset()) >= size))
{
working_buffer.resize(working_buffer.size() - end_position + limit);
end_position = limit;
/// Limit reached.
set(in->position(), 0);
return false;
}
BufferBase::set(in->buffer().begin(), size, in->offset());
return true;
}
off_t LimitSeekableReadBuffer::seek(off_t off, int whence)
{
off_t new_position;
@ -52,80 +81,39 @@ off_t LimitSeekableReadBuffer::seek(off_t off, int whence)
else if (whence == SEEK_CUR)
new_position = current_position + off;
else
throw Exception("LimitSeekableReadBuffer::seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
throw Exception("Seek expects SEEK_SET or SEEK_CUR as whence", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (new_position < 0)
throw Exception("SEEK_SET underflow: off = " + std::to_string(off), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (static_cast<UInt64>(new_position) > limit)
throw Exception("SEEK_CUR shift out of bounds", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (new_position < 0 || new_position + min_offset > max_offset)
throw Exception("Seek shift out of bounds", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
off_t change_position = new_position - current_position;
if ((working_buffer.begin() <= pos + change_position) && (pos + change_position <= working_buffer.end()))
off_t position_change = new_position - current_position;
if ((buffer().begin() <= pos + position_change) && (pos + position_change <= buffer().end()))
{
/// Position is still inside buffer.
pos += change_position;
assert(pos >= working_buffer.begin());
assert(pos <= working_buffer.end());
/// Position is still inside the buffer.
pos += position_change;
chassert(pos >= working_buffer.begin());
chassert(pos <= working_buffer.end());
return new_position;
}
in->seek(new_position, SEEK_SET);
working_buffer = in->buffer();
pos = in->position();
end_position = in->getPosition() + in->available();
/// Actual seek in the nested buffer will be performed in nextImpl().
need_seek = new_position + min_offset;
if (end_position > static_cast<off_t>(limit))
{
working_buffer.resize(working_buffer.size() - end_position + limit);
end_position = limit;
}
/// Set the size of the working buffer to zero so next call next() would call nextImpl().
set(in->position(), 0);
return new_position;
}
LimitSeekableReadBuffer::LimitSeekableReadBuffer(SeekableReadBuffer * in_, bool owns, UInt64 limit_)
: SeekableReadBuffer(in_ ? in_->position() : nullptr, 0)
, in(in_)
, owns_in(owns)
, limit(limit_)
off_t LimitSeekableReadBuffer::getPosition()
{
assert(in);
if (need_seek)
return *need_seek - min_offset;
off_t current_position = in->getPosition();
if (current_position > static_cast<off_t>(limit))
throw Exception("Limit for LimitSeekableReadBuffer exceeded", ErrorCodes::LIMIT_EXCEEDED);
working_buffer = in->buffer();
pos = in->position();
end_position = current_position + in->available();
if (end_position > static_cast<off_t>(limit))
{
working_buffer.resize(working_buffer.size() - end_position + limit);
end_position = limit;
}
}
LimitSeekableReadBuffer::LimitSeekableReadBuffer(SeekableReadBuffer & in_, UInt64 limit_)
: LimitSeekableReadBuffer(&in_, false, limit_)
{
}
LimitSeekableReadBuffer::LimitSeekableReadBuffer(std::unique_ptr<SeekableReadBuffer> in_, UInt64 limit_)
: LimitSeekableReadBuffer(in_.release(), true, limit_)
{
}
LimitSeekableReadBuffer::~LimitSeekableReadBuffer()
{
/// Update underlying buffer's position in case when limit wasn't reached.
/// We have to do that because `in->getPosition()` below most likely needs to know the current position in the buffer.
in->position() = position();
if (owns_in)
delete in;
return in->getPosition() - min_offset;
}
}

View File

@ -7,26 +7,26 @@
namespace DB
{
/** Allows to read from another SeekableReadBuffer no far than the specified offset.
* Note that the nested SeekableReadBuffer may read slightly more data internally to fill its buffer.
/** Allows to read from another SeekableReadBuffer up to `limit_size` bytes starting from `start_offset`.
* Note that the nested buffer may read slightly more data internally to fill its buffer.
*/
class LimitSeekableReadBuffer : public SeekableReadBuffer
{
public:
LimitSeekableReadBuffer(SeekableReadBuffer & in_, UInt64 limit_);
LimitSeekableReadBuffer(std::unique_ptr<SeekableReadBuffer> in_, UInt64 limit_);
~LimitSeekableReadBuffer() override;
LimitSeekableReadBuffer(SeekableReadBuffer & in_, UInt64 start_offset_, UInt64 limit_size_);
LimitSeekableReadBuffer(std::unique_ptr<SeekableReadBuffer> in_, UInt64 start_offset_, UInt64 limit_size_);
/// Returns adjusted position, i.e. returns `3` if the position in the nested buffer is `start_offset + 3`.
off_t getPosition() override;
off_t seek(off_t off, int whence) override;
off_t getPosition() override { return end_position - available(); }
private:
SeekableReadBuffer * in;
bool owns_in;
UInt64 limit;
off_t end_position; /// Offset of the end of working_buffer.
std::unique_ptr<SeekableReadBuffer> in;
off_t min_offset;
off_t max_offset;
std::optional<off_t> need_seek;
LimitSeekableReadBuffer(SeekableReadBuffer * in_, bool owns, UInt64 limit_);
bool nextImpl() override;
};

47
src/IO/ReadBuffer.cpp Normal file
View File

@ -0,0 +1,47 @@
#include <IO/ReadBuffer.h>
namespace DB
{
namespace
{
template <typename CustomData>
class ReadBufferWrapper : public ReadBuffer
{
public:
ReadBufferWrapper(ReadBuffer & in_, CustomData && custom_data_)
: ReadBuffer(in_.buffer().begin(), in_.buffer().size(), in_.offset()), in(in_), custom_data(std::move(custom_data_))
{
}
private:
ReadBuffer & in;
CustomData custom_data;
bool nextImpl() override
{
in.position() = position();
if (!in.next())
{
set(in.position(), 0);
return false;
}
BufferBase::set(in.buffer().begin(), in.buffer().size(), in.offset());
return true;
}
};
}
std::unique_ptr<ReadBuffer> wrapReadBufferReference(ReadBuffer & ref)
{
return std::make_unique<ReadBufferWrapper<nullptr_t>>(ref, nullptr);
}
std::unique_ptr<ReadBuffer> wrapReadBufferPointer(ReadBufferPtr ptr)
{
return std::make_unique<ReadBufferWrapper<ReadBufferPtr>>(*ptr, ReadBufferPtr{ptr});
}
}

View File

@ -247,62 +247,7 @@ using ReadBufferPtr = std::shared_ptr<ReadBuffer>;
/// - some just wrap the reference without ownership,
/// we need to be able to wrap reference-only buffers with movable transparent proxy-buffer.
/// The uniqueness of such wraps is responsibility of the code author.
inline std::unique_ptr<ReadBuffer> wrapReadBufferReference(ReadBuffer & buf)
{
class ReadBufferWrapper : public ReadBuffer
{
public:
explicit ReadBufferWrapper(ReadBuffer & buf_) : ReadBuffer(buf_.position(), 0), buf(buf_)
{
working_buffer = Buffer(buf.position(), buf.buffer().end());
}
private:
ReadBuffer & buf;
bool nextImpl() override
{
buf.position() = position();
if (!buf.next())
return false;
working_buffer = buf.buffer();
return true;
}
};
return std::make_unique<ReadBufferWrapper>(buf);
}
inline std::unique_ptr<ReadBuffer> wrapReadBufferPointer(ReadBufferPtr ptr)
{
class ReadBufferWrapper : public ReadBuffer
{
public:
explicit ReadBufferWrapper(ReadBufferPtr ptr_) : ReadBuffer(ptr_->position(), 0), ptr(ptr_)
{
working_buffer = Buffer(ptr->position(), ptr->buffer().end());
}
private:
ReadBufferPtr ptr;
bool nextImpl() override
{
ptr->position() = position();
if (!ptr->next())
return false;
working_buffer = ptr->buffer();
return true;
}
};
return std::make_unique<ReadBufferWrapper>(ptr);
}
std::unique_ptr<ReadBuffer> wrapReadBufferReference(ReadBuffer & ref);
std::unique_ptr<ReadBuffer> wrapReadBufferPointer(ReadBufferPtr ptr);
}

726
src/IO/S3/copyDataToS3.cpp Normal file
View File

@ -0,0 +1,726 @@
#include <IO/S3/copyDataToS3.h>
#if USE_AWS_S3
#include <Common/ProfileEvents.h>
#include <Common/typeid_cast.h>
#include <IO/LimitSeekableReadBuffer.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/StdStreamFromReadBuffer.h>
#include <aws/s3/S3Client.h>
#include <aws/s3/model/AbortMultipartUploadRequest.h>
#include <aws/s3/model/CompleteMultipartUploadRequest.h>
#include <aws/s3/model/CopyObjectRequest.h>
#include <aws/s3/model/CreateMultipartUploadRequest.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/s3/model/UploadPartCopyRequest.h>
#include <aws/s3/model/UploadPartRequest.h>
namespace ProfileEvents
{
extern const Event S3CreateMultipartUpload;
extern const Event S3CompleteMultipartUpload;
extern const Event S3UploadPart;
extern const Event S3PutObject;
}
namespace DB
{
namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int INVALID_CONFIG_PARAMETER;
extern const int LOGICAL_ERROR;
}
namespace
{
class UploadHelper
{
public:
UploadHelper(
const std::shared_ptr<const Aws::S3::S3Client> & client_ptr_,
const String & dest_bucket_,
const String & dest_key_,
const S3Settings::RequestSettings & request_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
const Poco::Logger * log_)
: client_ptr(client_ptr_)
, dest_bucket(dest_bucket_)
, dest_key(dest_key_)
, settings(request_settings_.getUploadSettings())
, check_objects_after_upload(request_settings_.check_objects_after_upload)
, max_unexpected_write_error_retries(request_settings_.max_unexpected_write_error_retries)
, object_metadata(object_metadata_)
, schedule(schedule_)
, log(log_)
{
}
virtual ~UploadHelper() = default;
protected:
std::shared_ptr<const Aws::S3::S3Client> client_ptr;
const String & dest_bucket;
const String & dest_key;
const S3Settings::RequestSettings::PartUploadSettings & settings;
bool check_objects_after_upload;
size_t max_unexpected_write_error_retries;
const std::optional<std::map<String, String>> & object_metadata;
ThreadPoolCallbackRunner<void> schedule;
const Poco::Logger * log;
struct UploadPartTask
{
std::unique_ptr<Aws::AmazonWebServiceRequest> req;
bool is_finished = false;
String tag;
std::exception_ptr exception;
};
size_t normal_part_size;
String multipart_upload_id;
std::atomic<bool> multipart_upload_aborted = false;
Strings part_tags;
std::list<UploadPartTask> TSA_GUARDED_BY(bg_tasks_mutex) bg_tasks;
int num_added_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
int num_finished_bg_tasks TSA_GUARDED_BY(bg_tasks_mutex) = 0;
std::mutex bg_tasks_mutex;
std::condition_variable bg_tasks_condvar;
void createMultipartUpload()
{
Aws::S3::Model::CreateMultipartUploadRequest request;
request.SetBucket(dest_bucket);
request.SetKey(dest_key);
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
request.SetContentType("binary/octet-stream");
if (object_metadata.has_value())
request.SetMetadata(object_metadata.value());
if (!settings.storage_class_name.empty())
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(settings.storage_class_name));
ProfileEvents::increment(ProfileEvents::S3CreateMultipartUpload);
auto outcome = client_ptr->CreateMultipartUpload(request);
if (outcome.IsSuccess())
{
multipart_upload_id = outcome.GetResult().GetUploadId();
LOG_TRACE(log, "Multipart upload has created. Bucket: {}, Key: {}, Upload id: {}", dest_bucket, dest_key, multipart_upload_id);
}
else
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
}
void completeMultipartUpload()
{
if (multipart_upload_aborted)
return;
LOG_TRACE(log, "Completing multipart upload. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", dest_bucket, dest_key, multipart_upload_id, part_tags.size());
if (part_tags.empty())
throw Exception("Failed to complete multipart upload. No parts have uploaded", ErrorCodes::S3_ERROR);
Aws::S3::Model::CompleteMultipartUploadRequest request;
request.SetBucket(dest_bucket);
request.SetKey(dest_key);
request.SetUploadId(multipart_upload_id);
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
for (size_t i = 0; i < part_tags.size(); ++i)
{
Aws::S3::Model::CompletedPart part;
multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(static_cast<int>(i + 1)));
}
request.SetMultipartUpload(multipart_upload);
size_t max_retry = std::max(max_unexpected_write_error_retries, 1UL);
for (size_t i = 0; i < max_retry; ++i)
{
ProfileEvents::increment(ProfileEvents::S3CompleteMultipartUpload);
auto outcome = client_ptr->CompleteMultipartUpload(request);
if (outcome.IsSuccess())
{
LOG_TRACE(log, "Multipart upload has completed. Bucket: {}, Key: {}, Upload_id: {}, Parts: {}", dest_bucket, dest_key, multipart_upload_id, part_tags.size());
break;
}
else if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
{
/// For unknown reason, at least MinIO can respond with NO_SUCH_KEY for put requests
/// BTW, NO_SUCH_UPLOAD is expected error and we shouldn't retry it
LOG_INFO(log, "Multipart upload failed with NO_SUCH_KEY error for Bucket: {}, Key: {}, Upload_id: {}, Parts: {}, will retry", dest_bucket, dest_key, multipart_upload_id, part_tags.size());
/// will retry
}
else
{
throw S3Exception(
outcome.GetError().GetErrorType(),
"Message: {}, Key: {}, Bucket: {}, Tags: {}",
outcome.GetError().GetMessage(), dest_key, dest_bucket, fmt::join(part_tags.begin(), part_tags.end(), " "));
}
}
}
void abortMultipartUpload()
{
LOG_TRACE(log, "Aborting multipart upload. Bucket: {}, Key: {}, Upload_id: {}", dest_bucket, dest_key, multipart_upload_id);
Aws::S3::Model::AbortMultipartUploadRequest abort_request;
abort_request.SetBucket(dest_bucket);
abort_request.SetKey(dest_key);
abort_request.SetUploadId(multipart_upload_id);
client_ptr->AbortMultipartUpload(abort_request);
multipart_upload_aborted = true;
}
void checkObjectAfterUpload()
{
LOG_TRACE(log, "Checking object {} exists after upload", dest_key);
S3::checkObjectExists(*client_ptr, dest_bucket, dest_key, {}, {}, "Immediately after upload");
LOG_TRACE(log, "Object {} exists after upload", dest_key);
}
void performMultipartUpload(size_t start_offset, size_t size)
{
calculatePartSize(size);
createMultipartUpload();
size_t position = start_offset;
size_t end_position = start_offset + size;
for (size_t part_number = 1; position < end_position; ++part_number)
{
if (multipart_upload_aborted)
break; /// No more part uploads.
size_t next_position = std::min(position + normal_part_size, end_position);
size_t part_size = next_position - position; /// `part_size` is either `normal_part_size` or smaller if it's the final part.
uploadPart(part_number, position, part_size);
position = next_position;
}
waitForAllBackGroundTasks();
completeMultipartUpload();
}
void calculatePartSize(size_t total_size)
{
if (!total_size)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chosen multipart upload for an empty file. This must not happen");
if (!settings.max_part_number)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_part_number must not be 0");
else if (!settings.min_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "min_upload_part_size must not be 0");
else if (settings.max_upload_part_size < settings.min_upload_part_size)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "max_upload_part_size must not be less than min_upload_part_size");
size_t part_size = settings.min_upload_part_size;
size_t num_parts = (total_size + part_size - 1) / part_size;
if (num_parts > settings.max_part_number)
{
part_size = (total_size + settings.max_part_number - 1) / settings.max_part_number;
num_parts = (total_size + part_size - 1) / part_size;
}
if (part_size > settings.max_upload_part_size)
{
part_size = settings.max_upload_part_size;
num_parts = (total_size + part_size - 1) / part_size;
}
if (num_parts < 1 || num_parts > settings.max_part_number || part_size < settings.min_upload_part_size
|| part_size > settings.max_upload_part_size)
{
String msg;
if (num_parts < 1)
msg = "Number of parts is zero";
else if (num_parts > settings.max_part_number)
msg = fmt::format("Number of parts exceeds {}", num_parts, settings.max_part_number);
else if (part_size < settings.min_upload_part_size)
msg = fmt::format("Size of a part is less than {}", part_size, settings.min_upload_part_size);
else
msg = fmt::format("Size of a part exceeds {}", part_size, settings.max_upload_part_size);
throw Exception(
ErrorCodes::INVALID_CONFIG_PARAMETER,
"{} while writing {} bytes to S3. Check max_part_number = {}, "
"min_upload_part_size = {}, max_upload_part_size = {}, max_single_part_upload_size = {}",
msg, total_size, settings.max_part_number, settings.min_upload_part_size,
settings.max_upload_part_size, settings.max_single_part_upload_size);
}
/// We've calculated the size of a normal part (the final part can be smaller).
normal_part_size = part_size;
}
void uploadPart(size_t part_number, size_t part_offset, size_t part_size)
{
LOG_TRACE(log, "Writing part. Bucket: {}, Key: {}, Upload_id: {}, Size: {}", dest_bucket, dest_key, multipart_upload_id, part_size);
if (!part_size)
{
LOG_TRACE(log, "Skipping writing an empty part.");
return;
}
if (schedule)
{
UploadPartTask * task = nullptr;
{
std::lock_guard lock(bg_tasks_mutex);
task = &bg_tasks.emplace_back();
++num_added_bg_tasks;
}
/// Notify waiting thread when task finished
auto task_finish_notify = [this, task]()
{
std::lock_guard lock(bg_tasks_mutex);
task->is_finished = true;
++num_finished_bg_tasks;
/// Notification under mutex is important here.
/// Otherwise, WriteBuffer could be destroyed in between
/// Releasing lock and condvar notification.
bg_tasks_condvar.notify_one();
};
try
{
task->req = fillUploadPartRequest(part_number, part_offset, part_size);
schedule([this, task, task_finish_notify]()
{
try
{
processUploadTask(*task);
}
catch (...)
{
task->exception = std::current_exception();
}
task_finish_notify();
}, 0);
}
catch (...)
{
task_finish_notify();
throw;
}
}
else
{
UploadPartTask task;
task.req = fillUploadPartRequest(part_number, part_offset, part_size);
processUploadTask(task);
part_tags.push_back(task.tag);
}
}
void processUploadTask(UploadPartTask & task)
{
if (multipart_upload_aborted)
return; /// Already aborted.
auto tag = processUploadPartRequest(*task.req);
std::lock_guard lock(bg_tasks_mutex); /// Protect bg_tasks from race
task.tag = tag;
LOG_TRACE(log, "Writing part finished. Bucket: {}, Key: {}, Upload_id: {}, Etag: {}, Parts: {}", dest_bucket, dest_key, multipart_upload_id, task.tag, bg_tasks.size());
}
virtual std::unique_ptr<Aws::AmazonWebServiceRequest> fillUploadPartRequest(size_t part_number, size_t part_offset, size_t part_size) = 0;
virtual String processUploadPartRequest(Aws::AmazonWebServiceRequest & request) = 0;
void waitForAllBackGroundTasks()
{
if (!schedule)
return;
std::unique_lock lock(bg_tasks_mutex);
/// Suppress warnings because bg_tasks_mutex is actually hold, but tsa annotations do not understand std::unique_lock
bg_tasks_condvar.wait(lock, [this]() {return TSA_SUPPRESS_WARNING_FOR_READ(num_added_bg_tasks) == TSA_SUPPRESS_WARNING_FOR_READ(num_finished_bg_tasks); });
auto & tasks = TSA_SUPPRESS_WARNING_FOR_WRITE(bg_tasks);
for (auto & task : tasks)
{
if (task.exception)
{
/// abortMultipartUpload() might be called already, see processUploadPartRequest().
/// However if there were concurrent uploads at that time, those part uploads might or might not succeed.
/// As a result, it might be necessary to abort a given multipart upload multiple times in order to completely free
/// all storage consumed by all parts.
abortMultipartUpload();
std::rethrow_exception(task.exception);
}
part_tags.push_back(task.tag);
}
}
};
/// Helper class to help implementing copyDataToS3().
class CopyDataToS3Helper : public UploadHelper
{
public:
CopyDataToS3Helper(
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer_,
size_t offset_,
size_t size_,
const std::shared_ptr<const Aws::S3::S3Client> & client_ptr_,
const String & dest_bucket_,
const String & dest_key_,
const S3Settings::RequestSettings & request_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_)
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, &Poco::Logger::get("copyDataToS3"))
, create_read_buffer(create_read_buffer_)
, offset(offset_)
, size(size_)
{
}
void performCopy()
{
if (size <= settings.max_single_part_upload_size)
performSinglepartUpload();
else
performMultipartUpload();
if (check_objects_after_upload)
checkObjectAfterUpload();
}
private:
std::function<std::unique_ptr<SeekableReadBuffer>()> create_read_buffer;
size_t offset;
size_t size;
void performSinglepartUpload()
{
Aws::S3::Model::PutObjectRequest request;
fillPutRequest(request);
processPutRequest(request);
}
void fillPutRequest(Aws::S3::Model::PutObjectRequest & request)
{
auto read_buffer = std::make_unique<LimitSeekableReadBuffer>(create_read_buffer(), offset, size);
request.SetBucket(dest_bucket);
request.SetKey(dest_key);
request.SetContentLength(size);
request.SetBody(std::make_unique<StdStreamFromReadBuffer>(std::move(read_buffer), size));
if (object_metadata.has_value())
request.SetMetadata(object_metadata.value());
if (!settings.storage_class_name.empty())
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(settings.storage_class_name));
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
request.SetContentType("binary/octet-stream");
}
void processPutRequest(const Aws::S3::Model::PutObjectRequest & request)
{
size_t max_retry = std::max(max_unexpected_write_error_retries, 1UL);
for (size_t i = 0; i < max_retry; ++i)
{
ProfileEvents::increment(ProfileEvents::S3PutObject);
auto outcome = client_ptr->PutObject(request);
if (outcome.IsSuccess())
{
LOG_TRACE(
log,
"Single part upload has completed. Bucket: {}, Key: {}, Object size: {}",
dest_bucket,
dest_key,
request.GetContentLength());
break;
}
else if (outcome.GetError().GetExceptionName() == "EntityTooLarge" || outcome.GetError().GetExceptionName() == "InvalidRequest")
{
// Can't come here with MinIO, MinIO allows single part upload for large objects.
LOG_INFO(
log,
"Single part upload failed with error {} for Bucket: {}, Key: {}, Object size: {}, will retry with multipart upload",
outcome.GetError().GetExceptionName(),
dest_bucket,
dest_key,
size);
performMultipartUpload();
break;
}
else if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
{
/// For unknown reason, at least MinIO can respond with NO_SUCH_KEY for put requests
LOG_INFO(
log,
"Single part upload failed with NO_SUCH_KEY error for Bucket: {}, Key: {}, Object size: {}, will retry",
dest_bucket,
dest_key,
request.GetContentLength());
/// will retry
}
else
{
throw S3Exception(
outcome.GetError().GetErrorType(),
"Message: {}, Key: {}, Bucket: {}, Object size: {}",
outcome.GetError().GetMessage(),
dest_key,
dest_bucket,
request.GetContentLength());
}
}
}
void performMultipartUpload() { UploadHelper::performMultipartUpload(offset, size); }
std::unique_ptr<Aws::AmazonWebServiceRequest> fillUploadPartRequest(size_t part_number, size_t part_offset, size_t part_size) override
{
auto read_buffer = std::make_unique<LimitSeekableReadBuffer>(create_read_buffer(), part_offset, part_size);
/// Setup request.
auto request = std::make_unique<Aws::S3::Model::UploadPartRequest>();
request->SetBucket(dest_bucket);
request->SetKey(dest_key);
request->SetPartNumber(static_cast<int>(part_number));
request->SetUploadId(multipart_upload_id);
request->SetContentLength(part_size);
request->SetBody(std::make_unique<StdStreamFromReadBuffer>(std::move(read_buffer), part_size));
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
request->SetContentType("binary/octet-stream");
return request;
}
String processUploadPartRequest(Aws::AmazonWebServiceRequest & request) override
{
auto & req = typeid_cast<Aws::S3::Model::UploadPartRequest &>(request);
ProfileEvents::increment(ProfileEvents::S3UploadPart);
auto outcome = client_ptr->UploadPart(req);
if (!outcome.IsSuccess())
{
abortMultipartUpload();
throw S3Exception(outcome.GetError().GetMessage(), outcome.GetError().GetErrorType());
}
return outcome.GetResult().GetETag();
}
};
/// Helper class to help implementing copyFileS3ToS3().
class CopyFileS3ToS3Helper : public UploadHelper
{
public:
CopyFileS3ToS3Helper(
const std::shared_ptr<const Aws::S3::S3Client> & client_ptr_,
const String & src_bucket_,
const String & src_key_,
size_t src_offset_,
size_t src_size_,
const String & dest_bucket_,
const String & dest_key_,
const S3Settings::RequestSettings & request_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_)
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, &Poco::Logger::get("copyFileS3ToS3"))
, src_bucket(src_bucket_)
, src_key(src_key_)
, offset(src_offset_)
, size(src_size_)
{
}
void performCopy()
{
if (size <= settings.max_single_operation_copy_size)
performSingleOperationCopy();
else
performMultipartUploadCopy();
if (check_objects_after_upload)
checkObjectAfterUpload();
}
private:
const String & src_bucket;
const String & src_key;
size_t offset;
size_t size;
void performSingleOperationCopy()
{
Aws::S3::Model::CopyObjectRequest request;
fillCopyRequest(request);
processCopyRequest(request);
}
void fillCopyRequest(Aws::S3::Model::CopyObjectRequest & request)
{
request.SetCopySource(src_bucket + "/" + src_key);
request.SetBucket(dest_bucket);
request.SetKey(dest_key);
if (object_metadata.has_value())
{
request.SetMetadata(object_metadata.value());
request.SetMetadataDirective(Aws::S3::Model::MetadataDirective::REPLACE);
}
if (!settings.storage_class_name.empty())
request.SetStorageClass(Aws::S3::Model::StorageClassMapper::GetStorageClassForName(settings.storage_class_name));
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
request.SetContentType("binary/octet-stream");
}
void processCopyRequest(const Aws::S3::Model::CopyObjectRequest & request)
{
size_t max_retry = std::max(max_unexpected_write_error_retries, 1UL);
for (size_t i = 0; i < max_retry; ++i)
{
auto outcome = client_ptr->CopyObject(request);
if (outcome.IsSuccess())
{
LOG_TRACE(
log,
"Single operation copy has completed. Bucket: {}, Key: {}, Object size: {}",
dest_bucket,
dest_key,
size);
break;
}
else if (outcome.GetError().GetExceptionName() == "EntityTooLarge" || outcome.GetError().GetExceptionName() == "InvalidRequest")
{
// Can't come here with MinIO, MinIO allows single part upload for large objects.
LOG_INFO(
log,
"Single operation copy failed with error {} for Bucket: {}, Key: {}, Object size: {}, will retry with multipart upload copy",
outcome.GetError().GetExceptionName(),
dest_bucket,
dest_key,
size);
performMultipartUploadCopy();
break;
}
else if (outcome.GetError().GetErrorType() == Aws::S3::S3Errors::NO_SUCH_KEY)
{
/// TODO: Is it true for copy requests?
/// For unknown reason, at least MinIO can respond with NO_SUCH_KEY for put requests
LOG_INFO(
log,
"Single operation copy failed with NO_SUCH_KEY error for Bucket: {}, Key: {}, Object size: {}, will retry",
dest_bucket,
dest_key,
size);
/// will retry
}
else
{
throw S3Exception(
outcome.GetError().GetErrorType(),
"Message: {}, Key: {}, Bucket: {}, Object size: {}",
outcome.GetError().GetMessage(),
dest_key,
dest_bucket,
size);
}
}
}
void performMultipartUploadCopy() { UploadHelper::performMultipartUpload(offset, size); }
std::unique_ptr<Aws::AmazonWebServiceRequest> fillUploadPartRequest(size_t part_number, size_t part_offset, size_t part_size) override
{
auto request = std::make_unique<Aws::S3::Model::UploadPartCopyRequest>();
/// Make a copy request to copy a part.
request->SetCopySource(src_bucket + "/" + src_key);
request->SetBucket(dest_bucket);
request->SetKey(dest_key);
request->SetUploadId(multipart_upload_id);
request->SetPartNumber(static_cast<int>(part_number));
request->SetCopySourceRange(fmt::format("bytes={}-{}", part_offset, part_offset + part_size - 1));
return request;
}
String processUploadPartRequest(Aws::AmazonWebServiceRequest & request) override
{
auto & req = typeid_cast<Aws::S3::Model::UploadPartCopyRequest &>(request);
auto outcome = client_ptr->UploadPartCopy(req);
if (!outcome.IsSuccess())
{
abortMultipartUpload();
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
return outcome.GetResult().GetCopyPartResult().GetETag();
}
};
}
void copyDataToS3(
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
size_t offset,
size_t size,
const std::shared_ptr<const Aws::S3::S3Client> & dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
const std::optional<std::map<String, String>> & object_metadata,
ThreadPoolCallbackRunner<void> schedule)
{
CopyDataToS3Helper helper{create_read_buffer, offset, size, dest_s3_client, dest_bucket, dest_key, settings, object_metadata, schedule};
helper.performCopy();
}
void copyFileS3ToS3(
const std::shared_ptr<const Aws::S3::S3Client> & s3_client,
const String & src_bucket,
const String & src_key,
size_t src_offset,
size_t src_size,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
const std::optional<std::map<String, String>> & object_metadata,
ThreadPoolCallbackRunner<void> schedule)
{
CopyFileS3ToS3Helper helper{s3_client, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, object_metadata, schedule};
helper.performCopy();
}
}
#endif

53
src/IO/S3/copyDataToS3.h Normal file
View File

@ -0,0 +1,53 @@
#pragma once
#include "config.h"
#if USE_AWS_S3
#include <Storages/StorageS3Settings.h>
#include <Interpreters/threadPoolCallbackRunner.h>
#include <base/types.h>
#include <aws/s3/S3Client.h>
#include <functional>
#include <memory>
namespace DB
{
class SeekableReadBuffer;
/// Copies data from any seekable source to S3.
/// The same functionality can be done by using the function copyData() and the class WriteBufferFromS3
/// however copyDataToS3() is faster and spends less memory.
/// The callback `create_read_buffer` can be called from multiple threads in parallel, so that should be thread-safe.
/// The parameters `offset` and `size` specify a part in the source to copy.
void copyDataToS3(
const std::function<std::unique_ptr<SeekableReadBuffer>()> & create_read_buffer,
size_t offset,
size_t size,
const std::shared_ptr<const Aws::S3::S3Client> & dest_s3_client,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
ThreadPoolCallbackRunner<void> schedule_ = {});
/// Copies a file from S3 to S3.
/// The same functionality can be done by using the function copyData() and the classes ReadBufferFromS3 and WriteBufferFromS3
/// however copyFileS3ToS3() is faster and spends less network traffic and memory.
/// The parameters `src_offset` and `src_size` specify a part in the source to copy.
void copyFileS3ToS3(
const std::shared_ptr<const Aws::S3::S3Client> & s3_client,
const String & src_bucket,
const String & src_key,
size_t src_offset,
size_t src_size,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
const std::optional<std::map<String, String>> & object_metadata = std::nullopt,
ThreadPoolCallbackRunner<void> schedule_ = {});
}
#endif

View File

@ -0,0 +1,63 @@
#include <IO/SeekableReadBuffer.h>
namespace DB
{
namespace
{
template <typename CustomData>
class SeekableReadBufferWrapper : public SeekableReadBuffer
{
public:
SeekableReadBufferWrapper(SeekableReadBuffer & in_, CustomData && custom_data_)
: SeekableReadBuffer(in_.buffer().begin(), in_.buffer().size(), in_.offset())
, in(in_)
, custom_data(std::move(custom_data_))
{
}
private:
SeekableReadBuffer & in;
CustomData custom_data;
bool nextImpl() override
{
in.position() = position();
if (!in.next())
{
set(in.position(), 0);
return false;
}
BufferBase::set(in.buffer().begin(), in.buffer().size(), in.offset());
return true;
}
off_t seek(off_t off, int whence) override
{
in.position() = position();
off_t new_pos = in.seek(off, whence);
BufferBase::set(in.buffer().begin(), in.buffer().size(), in.offset());
return new_pos;
}
off_t getPosition() override
{
in.position() = position();
return in.getPosition();
}
};
}
std::unique_ptr<SeekableReadBuffer> wrapSeekableReadBufferReference(SeekableReadBuffer & ref)
{
return std::make_unique<SeekableReadBufferWrapper<nullptr_t>>(ref, nullptr);
}
std::unique_ptr<SeekableReadBuffer> wrapSeekableReadBufferPointer(SeekableReadBufferPtr ptr)
{
return std::make_unique<SeekableReadBufferWrapper<SeekableReadBufferPtr>>(*ptr, SeekableReadBufferPtr{ptr});
}
}

View File

@ -69,4 +69,9 @@ public:
using SeekableReadBufferPtr = std::shared_ptr<SeekableReadBuffer>;
/// Wraps a reference to a SeekableReadBuffer into an unique pointer to SeekableReadBuffer.
/// This function is like wrapReadBufferReference() but for SeekableReadBuffer.
std::unique_ptr<SeekableReadBuffer> wrapSeekableReadBufferReference(SeekableReadBuffer & ref);
std::unique_ptr<SeekableReadBuffer> wrapSeekableReadBufferPointer(SeekableReadBufferPtr ptr);
}

View File

@ -0,0 +1,111 @@
#include <IO/StdStreamBufFromReadBuffer.h>
#include <IO/SeekableReadBuffer.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int SEEK_POSITION_OUT_OF_BOUND;
}
StdStreamBufFromReadBuffer::StdStreamBufFromReadBuffer(std::unique_ptr<ReadBuffer> read_buffer_, size_t size_)
: read_buffer(std::move(read_buffer_)), seekable_read_buffer(dynamic_cast<SeekableReadBuffer *>(read_buffer.get())), size(size_)
{
}
StdStreamBufFromReadBuffer::StdStreamBufFromReadBuffer(ReadBuffer & read_buffer_, size_t size_) : size(size_)
{
if (dynamic_cast<SeekableReadBuffer *>(&read_buffer_))
{
read_buffer = wrapSeekableReadBufferReference(static_cast<SeekableReadBuffer &>(read_buffer_));
seekable_read_buffer = static_cast<SeekableReadBuffer *>(read_buffer.get());
}
else
{
read_buffer = wrapReadBufferReference(read_buffer_);
}
}
StdStreamBufFromReadBuffer::~StdStreamBufFromReadBuffer() = default;
int StdStreamBufFromReadBuffer::underflow()
{
char c;
if (!read_buffer->peek(c))
return std::char_traits<char>::eof();
return c;
}
std::streamsize StdStreamBufFromReadBuffer::showmanyc()
{
return read_buffer->available();
}
std::streamsize StdStreamBufFromReadBuffer::xsgetn(char_type* s, std::streamsize count)
{
return read_buffer->read(s, count);
}
std::streampos StdStreamBufFromReadBuffer::seekoff(std::streamoff off, std::ios_base::seekdir dir, std::ios_base::openmode which)
{
if (dir == std::ios_base::beg)
return seekpos(off, which);
else if (dir == std::ios_base::cur)
return seekpos(getCurrentPosition() + off, which);
else if (dir == std::ios_base::end)
return seekpos(size + off, which);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong seek's base {}", static_cast<int>(dir));
}
std::streampos StdStreamBufFromReadBuffer::seekpos(std::streampos pos, std::ios_base::openmode which)
{
if (!(which & std::ios_base::in))
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Wrong seek mode {}", static_cast<int>(which));
std::streamoff offset = pos - getCurrentPosition();
if (!offset)
return pos;
if ((read_buffer->buffer().begin() <= read_buffer->position() + offset) && (read_buffer->position() + offset <= read_buffer->buffer().end()))
{
read_buffer->position() += offset;
return pos;
}
if (seekable_read_buffer)
return seekable_read_buffer->seek(pos, SEEK_SET);
if (offset > 0)
{
read_buffer->ignore(offset);
return pos;
}
throw Exception(ErrorCodes::SEEK_POSITION_OUT_OF_BOUND, "Seek's offset {} is out of bound", pos);
}
std::streampos StdStreamBufFromReadBuffer::getCurrentPosition() const
{
if (seekable_read_buffer)
return seekable_read_buffer->getPosition();
else
return read_buffer->count();
}
std::streamsize StdStreamBufFromReadBuffer::xsputn(const char*, std::streamsize)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "StdStreamBufFromReadBuffer cannot be used for output");
}
int StdStreamBufFromReadBuffer::overflow(int)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "StdStreamBufFromReadBuffer cannot be used for output");
}
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <memory>
#include <streambuf>
namespace DB
{
class ReadBuffer;
class SeekableReadBuffer;
/// `std::streambuf`-compatible wrapper around a ReadBuffer.
class StdStreamBufFromReadBuffer : public std::streambuf
{
public:
using Base = std::streambuf;
explicit StdStreamBufFromReadBuffer(std::unique_ptr<ReadBuffer> read_buffer_, size_t size_);
explicit StdStreamBufFromReadBuffer(ReadBuffer & read_buffer_, size_t size_);
~StdStreamBufFromReadBuffer() override;
private:
int underflow() override;
std::streamsize showmanyc() override;
std::streamsize xsgetn(char* s, std::streamsize count) override;
std::streampos seekoff(std::streamoff off, std::ios_base::seekdir dir, std::ios_base::openmode which) override;
std::streampos seekpos(std::streampos pos, std::ios_base::openmode which) override;
std::streamsize xsputn(const char* s, std::streamsize n) override;
int overflow(int c) override;
std::streampos getCurrentPosition() const;
std::unique_ptr<ReadBuffer> read_buffer;
SeekableReadBuffer * seekable_read_buffer = nullptr;
size_t size;
};
}

View File

@ -0,0 +1,39 @@
#pragma once
#include <IO/StdStreamBufFromReadBuffer.h>
#include <iostream>
#include <memory>
namespace DB
{
class ReadBuffer;
/// `std::istream`-compatible wrapper around a ReadBuffer.
class StdIStreamFromReadBuffer : public std::istream
{
public:
using Base = std::istream;
StdIStreamFromReadBuffer(std::unique_ptr<ReadBuffer> buf, size_t size) : Base(&stream_buf), stream_buf(std::move(buf), size) { }
StdIStreamFromReadBuffer(ReadBuffer & buf, size_t size) : Base(&stream_buf), stream_buf(buf, size) { }
StdStreamBufFromReadBuffer * rdbuf() const { return const_cast<StdStreamBufFromReadBuffer *>(&stream_buf); }
private:
StdStreamBufFromReadBuffer stream_buf;
};
/// `std::iostream`-compatible wrapper around a ReadBuffer.
class StdStreamFromReadBuffer : public std::iostream
{
public:
using Base = std::iostream;
StdStreamFromReadBuffer(std::unique_ptr<ReadBuffer> buf, size_t size) : Base(&stream_buf), stream_buf(std::move(buf), size) { }
StdStreamFromReadBuffer(ReadBuffer & buf, size_t size) : Base(&stream_buf), stream_buf(buf, size) { }
StdStreamBufFromReadBuffer * rdbuf() const { return const_cast<StdStreamBufFromReadBuffer *>(&stream_buf); }
private:
StdStreamBufFromReadBuffer stream_buf;
};
}

View File

@ -1,12 +1,14 @@
<clickhouse>
<s3>
<multipart_upload_copy>
<endpoint>http://minio1:9001/root/data/backups/multipart_upload_copy/</endpoint>
<!-- We set max_single_operation_copy_size=1 here so multipart upload copy will always be chosen for that test. -->
<multipart>
<endpoint>http://minio1:9001/root/data/backups/multipart/</endpoint>
<!-- We set max_single_part_upload_size and max_single_operation_copy_size to 1 here so
multipart upload will be chosen for that test. -->
<max_single_part_upload_size>1</max_single_part_upload_size>
<max_single_operation_copy_size>1</max_single_operation_copy_size>
<min_upload_part_size>5242880</min_upload_part_size>
<upload_part_size_multiply_parts_count_threshold>3</upload_part_size_multiply_parts_count_threshold>
<upload_part_size_multiply_factor>2</upload_part_size_multiply_factor>
</multipart_upload_copy>
</multipart>
</s3>
</clickhouse>

View File

@ -121,6 +121,16 @@ def test_backup_to_s3_named_collection():
check_backup_and_restore(storage_policy, backup_destination)
def test_backup_to_s3_multipart():
storage_policy = "default"
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/multipart/{backup_name}', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination, size=1000000)
assert node.contains_in_log(
f"copyDataToS3: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}"
)
def test_backup_to_s3_native_copy():
storage_policy = "policy_s3"
backup_name = new_backup_name()
@ -129,7 +139,9 @@ def test_backup_to_s3_native_copy():
)
check_backup_and_restore(storage_policy, backup_destination)
assert node.contains_in_log("using native copy")
assert node.contains_in_log("single-operation copy")
assert node.contains_in_log(
f"copyFileS3ToS3: Single operation copy has completed. Bucket: root, Key: data/backups/{backup_name}"
)
def test_backup_to_s3_native_copy_other_bucket():
@ -140,13 +152,17 @@ def test_backup_to_s3_native_copy_other_bucket():
)
check_backup_and_restore(storage_policy, backup_destination)
assert node.contains_in_log("using native copy")
assert node.contains_in_log("single-operation copy")
assert node.contains_in_log(
f"copyFileS3ToS3: Single operation copy has completed. Bucket: root, Key: data/backups/{backup_name}"
)
def test_backup_to_s3_native_copy_multipart_upload():
def test_backup_to_s3_native_copy_multipart():
storage_policy = "policy_s3"
backup_name = new_backup_name()
backup_destination = f"S3('http://minio1:9001/root/data/backups/multipart_upload_copy/{backup_name}', 'minio', 'minio123')"
backup_destination = f"S3('http://minio1:9001/root/data/backups/multipart/{backup_name}', 'minio', 'minio123')"
check_backup_and_restore(storage_policy, backup_destination, size=1000000)
assert node.contains_in_log("using native copy")
assert node.contains_in_log("multipart upload copy")
assert node.contains_in_log(
f"copyFileS3ToS3: Multipart upload has completed. Bucket: root, Key: data/backups/multipart/{backup_name}/"
)