Adaptive choose of single/multi part upload in WriteBufferFromS3.

This commit is contained in:
Pavel Kovalenko 2020-12-09 17:09:04 +03:00
parent 7d77a7ba24
commit db7a87089b
10 changed files with 136 additions and 129 deletions

View File

@ -65,6 +65,7 @@ class IColumn;
M(UInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.", 0) \
M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \
M(UInt64, s3_min_upload_part_size, 512*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_single_part_upload_size, 64*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \

View File

@ -326,11 +326,11 @@ namespace
const String & bucket_,
Metadata metadata_,
const String & s3_path_,
bool is_multipart,
size_t min_upload_part_size,
size_t max_single_part_upload_size,
size_t buf_size_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, impl(WriteBufferFromS3(client_ptr_, bucket_, metadata_.s3_root_path + s3_path_, min_upload_part_size, is_multipart, buf_size_))
, impl(WriteBufferFromS3(client_ptr_, bucket_, metadata_.s3_root_path + s3_path_, min_upload_part_size, max_single_part_upload_size, buf_size_))
, metadata(std::move(metadata_))
, s3_path(s3_path_)
{
@ -521,7 +521,7 @@ DiskS3::DiskS3(
String s3_root_path_,
String metadata_path_,
size_t min_upload_part_size_,
size_t min_multi_part_upload_size_,
size_t max_single_part_upload_size_,
size_t min_bytes_for_seek_)
: IDisk(std::make_unique<AsyncExecutor>())
, name(std::move(name_))
@ -531,7 +531,7 @@ DiskS3::DiskS3(
, s3_root_path(std::move(s3_root_path_))
, metadata_path(std::move(metadata_path_))
, min_upload_part_size(min_upload_part_size_)
, min_multi_part_upload_size(min_multi_part_upload_size_)
, max_single_part_upload_size(max_single_part_upload_size_)
, min_bytes_for_seek(min_bytes_for_seek_)
{
}
@ -642,7 +642,7 @@ std::unique_ptr<ReadBufferFromFileBase> DiskS3::readFile(const String & path, si
return std::make_unique<SeekAvoidingReadBuffer>(std::move(reader), min_bytes_for_seek);
}
std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t)
std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t, size_t)
{
bool exist = exists(path);
if (exist)
@ -653,7 +653,6 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
}
/// Path to store new S3 object.
auto s3_path = getRandomName();
bool is_multipart = estimated_size >= min_multi_part_upload_size;
if (!exist || mode == WriteMode::Rewrite)
{
/// If metadata file exists - remove and create new.
@ -666,7 +665,8 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write to file by path: {} New S3 path: {}", backQuote(metadata_path + path), s3_root_path + s3_path);
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, is_multipart, min_upload_part_size, buf_size);
return std::make_unique<WriteIndirectBufferFromS3>(
client, bucket, metadata, s3_path, min_upload_part_size, max_single_part_upload_size, buf_size);
}
else
{
@ -675,7 +675,8 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.",
backQuote(metadata_path + path), s3_root_path + s3_path, metadata.s3_objects.size());
return std::make_unique<WriteIndirectBufferFromS3>(client, bucket, metadata, s3_path, is_multipart, min_upload_part_size, buf_size);
return std::make_unique<WriteIndirectBufferFromS3>(
client, bucket, metadata, s3_path, min_upload_part_size, max_single_part_upload_size, buf_size);
}
}

View File

@ -125,7 +125,7 @@ private:
const String s3_root_path;
const String metadata_path;
size_t min_upload_part_size;
size_t min_multi_part_upload_size;
size_t max_single_part_upload_size;
size_t min_bytes_for_seek;
UInt64 reserved_bytes = 0;

View File

@ -147,7 +147,7 @@ void registerDiskS3(DiskFactory & factory)
uri.key,
metadata_path,
context.getSettingsRef().s3_min_upload_part_size,
config.getUInt64(config_prefix + ".min_multi_part_upload_size", 10 * 1024 * 1024),
context.getSettingsRef().s3_max_single_part_upload_size,
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024));
/// This code is used only to check access to the corresponding disk.

View File

@ -5,12 +5,9 @@
# include <IO/WriteBufferFromS3.h>
# include <IO/WriteHelpers.h>
# include <aws/core/utils/memory/stl/AWSStreamFwd.h>
# include <aws/core/utils/memory/stl/AWSStringStream.h>
# include <aws/s3/S3Client.h>
# include <aws/s3/model/CompleteMultipartUploadRequest.h>
# include <aws/s3/model/AbortMultipartUploadRequest.h>
# include <aws/s3/model/CreateMultipartUploadRequest.h>
# include <aws/s3/model/CompleteMultipartUploadRequest.h>
# include <aws/s3/model/PutObjectRequest.h>
# include <aws/s3/model/UploadPartRequest.h>
# include <common/logger_useful.h>
@ -42,20 +39,17 @@ WriteBufferFromS3::WriteBufferFromS3(
const String & bucket_,
const String & key_,
size_t minimum_upload_part_size_,
bool is_multipart_,
size_t max_single_part_upload_size_,
size_t buffer_size_)
: BufferWithOwnMemory<WriteBuffer>(buffer_size_, nullptr, 0)
, is_multipart(is_multipart_)
, bucket(bucket_)
, key(key_)
, client_ptr(std::move(client_ptr_))
, minimum_upload_part_size{minimum_upload_part_size_}
, temporary_buffer{std::make_unique<WriteBufferFromOwnString>()}
, last_part_size{0}
{
if (is_multipart)
initiate();
}
, minimum_upload_part_size(minimum_upload_part_size_)
, max_single_part_upload_size(max_single_part_upload_size_)
, temporary_buffer(Aws::MakeShared<Aws::StringStream>("temporary buffer"))
, last_part_size(0)
{ }
void WriteBufferFromS3::nextImpl()
{
@ -66,16 +60,17 @@ void WriteBufferFromS3::nextImpl()
ProfileEvents::increment(ProfileEvents::S3WriteBytes, offset());
if (is_multipart)
{
last_part_size += offset();
last_part_size += offset();
if (last_part_size > minimum_upload_part_size)
{
writePart(temporary_buffer->str());
last_part_size = 0;
temporary_buffer->restart();
}
/// Data size exceeds singlepart upload threshold, need to use multipart upload.
if (multipart_upload_id.empty() && last_part_size > max_single_part_upload_size)
createMultipartUpload();
if (!multipart_upload_id.empty() && last_part_size > minimum_upload_part_size)
{
writePart();
last_part_size = 0;
temporary_buffer = Aws::MakeShared<Aws::StringStream>("temporary buffer");
}
}
@ -86,17 +81,23 @@ void WriteBufferFromS3::finalize()
void WriteBufferFromS3::finalizeImpl()
{
if (!finalized)
if (finalized)
return;
next();
if (multipart_upload_id.empty())
{
next();
if (is_multipart)
writePart(temporary_buffer->str());
complete();
finalized = true;
makeSinglepartUpload();
}
else
{
/// Write rest of the data as last part.
writePart();
completeMultipartUpload();
}
finalized = true;
}
WriteBufferFromS3::~WriteBufferFromS3()
@ -111,7 +112,7 @@ WriteBufferFromS3::~WriteBufferFromS3()
}
}
void WriteBufferFromS3::initiate()
void WriteBufferFromS3::createMultipartUpload()
{
Aws::S3::Model::CreateMultipartUploadRequest req;
req.SetBucket(bucket);
@ -121,17 +122,17 @@ void WriteBufferFromS3::initiate()
if (outcome.IsSuccess())
{
upload_id = outcome.GetResult().GetUploadId();
LOG_DEBUG(log, "Multipart upload initiated. Upload id: {}", upload_id);
multipart_upload_id = outcome.GetResult().GetUploadId();
LOG_DEBUG(log, "Multipart upload has created. Upload id: {}", multipart_upload_id);
}
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
void WriteBufferFromS3::writePart(const String & data)
void WriteBufferFromS3::writePart()
{
if (data.empty())
if (temporary_buffer->tellp() <= 0)
return;
if (part_tags.size() == S3_WARN_MAX_PARTS)
@ -145,91 +146,69 @@ void WriteBufferFromS3::writePart(const String & data)
req.SetBucket(bucket);
req.SetKey(key);
req.SetPartNumber(part_tags.size() + 1);
req.SetUploadId(upload_id);
req.SetContentLength(data.size());
req.SetBody(std::make_shared<Aws::StringStream>(data));
req.SetUploadId(multipart_upload_id);
req.SetContentLength(temporary_buffer->tellp());
req.SetBody(temporary_buffer);
auto outcome = client_ptr->UploadPart(req);
LOG_TRACE(log, "Writing part. Bucket: {}, Key: {}, Upload_id: {}, Data size: {}", bucket, key, upload_id, data.size());
LOG_TRACE(log, "Writing part. Bucket: {}, Key: {}, Upload_id: {}, Data size: {}", bucket, key, multipart_upload_id, temporary_buffer->tellp());
if (outcome.IsSuccess())
{
auto etag = outcome.GetResult().GetETag();
part_tags.push_back(etag);
LOG_DEBUG(log, "Writing part finished. Total parts: {}, Upload_id: {}, Etag: {}", part_tags.size(), upload_id, etag);
LOG_DEBUG(log, "Writing part finished. Total parts: {}, Upload_id: {}, Etag: {}", part_tags.size(), multipart_upload_id, etag);
}
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
void WriteBufferFromS3::complete()
void WriteBufferFromS3::completeMultipartUpload()
{
if (is_multipart)
LOG_DEBUG(log, "Completing multipart upload. Bucket: {}, Key: {}, Upload_id: {}", bucket, key, multipart_upload_id);
Aws::S3::Model::CompleteMultipartUploadRequest req;
req.SetBucket(bucket);
req.SetKey(key);
req.SetUploadId(multipart_upload_id);
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
for (size_t i = 0; i < part_tags.size(); ++i)
{
if (part_tags.empty())
{
LOG_DEBUG(log, "Multipart upload has no data. Aborting it. Bucket: {}, Key: {}, Upload_id: {}", bucket, key, upload_id);
Aws::S3::Model::AbortMultipartUploadRequest req;
req.SetBucket(bucket);
req.SetKey(key);
req.SetUploadId(upload_id);
auto outcome = client_ptr->AbortMultipartUpload(req);
if (outcome.IsSuccess())
LOG_DEBUG(log, "Aborting multipart upload completed. Upload_id: {}", upload_id);
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
return;
}
LOG_DEBUG(log, "Completing multipart upload. Bucket: {}, Key: {}, Upload_id: {}", bucket, key, upload_id);
Aws::S3::Model::CompleteMultipartUploadRequest req;
req.SetBucket(bucket);
req.SetKey(key);
req.SetUploadId(upload_id);
Aws::S3::Model::CompletedMultipartUpload multipart_upload;
for (size_t i = 0; i < part_tags.size(); ++i)
{
Aws::S3::Model::CompletedPart part;
multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1));
}
req.SetMultipartUpload(multipart_upload);
auto outcome = client_ptr->CompleteMultipartUpload(req);
if (outcome.IsSuccess())
LOG_DEBUG(log, "Multipart upload completed. Upload_id: {}", upload_id);
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
Aws::S3::Model::CompletedPart part;
multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1));
}
req.SetMultipartUpload(multipart_upload);
auto outcome = client_ptr->CompleteMultipartUpload(req);
if (outcome.IsSuccess())
LOG_DEBUG(log, "Multipart upload has completed. Upload_id: {}", multipart_upload_id);
else
{
LOG_DEBUG(log, "Making single part upload. Bucket: {}, Key: {}", bucket, key);
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
Aws::S3::Model::PutObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
void WriteBufferFromS3::makeSinglepartUpload()
{
if (temporary_buffer->tellp() <= 0)
return;
/// This could be improved using an adapter to WriteBuffer.
const std::shared_ptr<Aws::IOStream> input_data = Aws::MakeShared<Aws::StringStream>("temporary buffer", temporary_buffer->str());
temporary_buffer = std::make_unique<WriteBufferFromOwnString>();
req.SetBody(input_data);
LOG_DEBUG(log, "Making single part upload. Bucket: {}, Key: {}", bucket, key);
auto outcome = client_ptr->PutObject(req);
Aws::S3::Model::PutObjectRequest req;
req.SetBucket(bucket);
req.SetKey(key);
req.SetContentLength(temporary_buffer->tellp());
req.SetBody(temporary_buffer);
if (outcome.IsSuccess())
LOG_DEBUG(log, "Single part upload has completed. Bucket: {}, Key: {}", bucket, key);
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
auto outcome = client_ptr->PutObject(req);
if (outcome.IsSuccess())
LOG_DEBUG(log, "Single part upload has completed. Bucket: {}, Key: {}", bucket, key);
else
throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR);
}
}

View File

@ -6,11 +6,13 @@
# include <memory>
# include <vector>
# include <common/logger_useful.h>
# include <common/types.h>
# include <IO/BufferWithOwnMemory.h>
# include <IO/HTTPCommon.h>
# include <IO/WriteBuffer.h>
# include <IO/WriteBufferFromString.h>
# include <aws/core/utils/memory/stl/AWSStringStream.h>
namespace Aws::S3
{
@ -19,23 +21,29 @@ class S3Client;
namespace DB
{
/* Perform S3 HTTP PUT request.
/**
* Buffer to write a data to a S3 object with specified bucket and key.
* If data size written to the buffer is less than 'max_single_part_upload_size' write is performed using singlepart upload.
* In another case multipart upload is used:
* Data is divided on chunks with size greater than 'minimum_upload_part_size'. Last chunk can be less than this threshold.
* Each chunk is written as a part to S3.
*/
class WriteBufferFromS3 : public BufferWithOwnMemory<WriteBuffer>
{
private:
bool is_multipart;
String bucket;
String key;
std::shared_ptr<Aws::S3::S3Client> client_ptr;
size_t minimum_upload_part_size;
std::unique_ptr<WriteBufferFromOwnString> temporary_buffer;
size_t max_single_part_upload_size;
/// Buffer to accumulate data.
std::shared_ptr<Aws::StringStream> temporary_buffer;
size_t last_part_size;
/// Upload in S3 is made in parts.
/// We initiate upload, then upload each part and get ETag as a response, and then finish upload with listing all our parts.
String upload_id;
String multipart_upload_id;
std::vector<String> part_tags;
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");
@ -46,7 +54,7 @@ public:
const String & bucket_,
const String & key_,
size_t minimum_upload_part_size_,
bool is_multipart,
size_t max_single_part_upload_size_,
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);
void nextImpl() override;
@ -59,9 +67,11 @@ public:
private:
bool finalized = false;
void initiate();
void writePart(const String & data);
void complete();
void createMultipartUpload();
void writePart();
void completeMultipartUpload();
void makeSinglepartUpload();
void finalizeImpl();
};

View File

@ -141,17 +141,18 @@ namespace
public:
StorageS3BlockOutputStream(
const String & format,
UInt64 min_upload_part_size,
const Block & sample_block_,
const Context & context,
const CompressionMethod compression_method,
const std::shared_ptr<Aws::S3::S3Client> & client,
const String & bucket,
const String & key)
const String & key,
size_t min_upload_part_size,
size_t max_single_part_upload_size)
: sample_block(sample_block_)
{
write_buf = wrapWriteBufferWithCompressionMethod(
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, true), compression_method, 3);
std::make_unique<WriteBufferFromS3>(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3);
writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context);
}
@ -192,6 +193,7 @@ StorageS3::StorageS3(
const StorageID & table_id_,
const String & format_name_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_,
@ -201,6 +203,7 @@ StorageS3::StorageS3(
, global_context(context_.getGlobalContext())
, format_name(format_name_)
, min_upload_part_size(min_upload_part_size_)
, max_single_part_upload_size(max_single_part_upload_size_)
, compression_method(compression_method_)
, name(uri_.storage_name)
{
@ -325,9 +328,15 @@ Pipe StorageS3::read(
BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/)
{
return std::make_shared<StorageS3BlockOutputStream>(
format_name, min_upload_part_size, metadata_snapshot->getSampleBlock(),
global_context, chooseCompressionMethod(uri.endpoint, compression_method),
client, uri.bucket, uri.key);
format_name,
metadata_snapshot->getSampleBlock(),
global_context,
chooseCompressionMethod(uri.endpoint, compression_method),
client,
uri.bucket,
uri.key,
min_upload_part_size,
max_single_part_upload_size);
}
void registerStorageS3Impl(const String & name, StorageFactory & factory)
@ -356,6 +365,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
}
UInt64 min_upload_part_size = args.local_context.getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = args.local_context.getSettingsRef().s3_max_single_part_upload_size;
String compression_method;
String format_name;
@ -377,6 +387,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
args.table_id,
format_name,
min_upload_part_size,
max_single_part_upload_size,
args.columns,
args.constraints,
args.context,

View File

@ -31,6 +31,7 @@ public:
const StorageID & table_id_,
const String & format_name_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_,
@ -59,7 +60,8 @@ private:
const Context & global_context;
String format_name;
UInt64 min_upload_part_size;
size_t min_upload_part_size;
size_t max_single_part_upload_size;
String compression_method;
std::shared_ptr<Aws::S3::S3Client> client;
String name;

View File

@ -67,6 +67,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const C
Poco::URI uri (filename);
S3::URI s3_uri (uri);
UInt64 min_upload_part_size = context.getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = context.getSettingsRef().s3_max_single_part_upload_size;
StoragePtr storage = StorageS3::create(
s3_uri,
@ -75,6 +76,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const C
StorageID(getDatabaseName(), table_name),
format,
min_upload_part_size,
max_single_part_upload_size,
getActualTableStructure(context),
ConstraintsDescription{},
const_cast<Context &>(context),

View File

@ -306,7 +306,8 @@ def test_multipart_put(cluster, maybe_auth, positive):
cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, maybe_auth, table_format)
try:
run_query(instance, put_query, stdin=csv_data, settings={'s3_min_upload_part_size': min_part_size_bytes})
run_query(instance, put_query, stdin=csv_data, settings={'s3_min_upload_part_size': min_part_size_bytes,
's3_max_single_part_upload_size': 0})
except helpers.client.QueryRuntimeException:
if positive:
raise