From db7a87089b69744f60edd506f0287e3ae8363b35 Mon Sep 17 00:00:00 2001 From: Pavel Kovalenko Date: Wed, 9 Dec 2020 17:09:04 +0300 Subject: [PATCH] Adaptive choose of single/multi part upload in WriteBufferFromS3. --- src/Core/Settings.h | 1 + src/Disks/S3/DiskS3.cpp | 17 +- src/Disks/S3/DiskS3.h | 2 +- src/Disks/S3/registerDiskS3.cpp | 2 +- src/IO/WriteBufferFromS3.cpp | 179 ++++++++++------------ src/IO/WriteBufferFromS3.h | 32 ++-- src/Storages/StorageS3.cpp | 23 ++- src/Storages/StorageS3.h | 4 +- src/TableFunctions/TableFunctionS3.cpp | 2 + tests/integration/test_storage_s3/test.py | 3 +- 10 files changed, 136 insertions(+), 129 deletions(-) diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 2cbe0c16cae..e9578c4c7da 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -65,6 +65,7 @@ class IColumn; M(UInt64, distributed_connections_pool_size, DBMS_DEFAULT_DISTRIBUTED_CONNECTIONS_POOL_SIZE, "Maximum number of connections with one remote server in the pool.", 0) \ M(UInt64, connections_with_failover_max_tries, DBMS_CONNECTION_POOL_WITH_FAILOVER_DEFAULT_MAX_TRIES, "The maximum number of attempts to connect to replicas.", 0) \ M(UInt64, s3_min_upload_part_size, 512*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \ + M(UInt64, s3_max_single_part_upload_size, 64*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \ M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \ M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \ M(Bool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \ diff --git a/src/Disks/S3/DiskS3.cpp b/src/Disks/S3/DiskS3.cpp index 507af58f9fa..c3d6d3dcc22 100644 --- a/src/Disks/S3/DiskS3.cpp +++ b/src/Disks/S3/DiskS3.cpp @@ -326,11 +326,11 @@ namespace const String & bucket_, Metadata metadata_, const String & s3_path_, - bool is_multipart, size_t min_upload_part_size, + size_t max_single_part_upload_size, size_t buf_size_) : WriteBufferFromFileBase(buf_size_, nullptr, 0) - , impl(WriteBufferFromS3(client_ptr_, bucket_, metadata_.s3_root_path + s3_path_, min_upload_part_size, is_multipart, buf_size_)) + , impl(WriteBufferFromS3(client_ptr_, bucket_, metadata_.s3_root_path + s3_path_, min_upload_part_size, max_single_part_upload_size, buf_size_)) , metadata(std::move(metadata_)) , s3_path(s3_path_) { @@ -521,7 +521,7 @@ DiskS3::DiskS3( String s3_root_path_, String metadata_path_, size_t min_upload_part_size_, - size_t min_multi_part_upload_size_, + size_t max_single_part_upload_size_, size_t min_bytes_for_seek_) : IDisk(std::make_unique()) , name(std::move(name_)) @@ -531,7 +531,7 @@ DiskS3::DiskS3( , s3_root_path(std::move(s3_root_path_)) , metadata_path(std::move(metadata_path_)) , min_upload_part_size(min_upload_part_size_) - , min_multi_part_upload_size(min_multi_part_upload_size_) + , max_single_part_upload_size(max_single_part_upload_size_) , min_bytes_for_seek(min_bytes_for_seek_) { } @@ -642,7 +642,7 @@ std::unique_ptr DiskS3::readFile(const String & path, si return std::make_unique(std::move(reader), min_bytes_for_seek); } -std::unique_ptr DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t estimated_size, size_t) +std::unique_ptr DiskS3::writeFile(const String & path, size_t buf_size, WriteMode mode, size_t, size_t) { bool exist = exists(path); if (exist) @@ -653,7 +653,6 @@ std::unique_ptr DiskS3::writeFile(const String & path, } /// Path to store new S3 object. auto s3_path = getRandomName(); - bool is_multipart = estimated_size >= min_multi_part_upload_size; if (!exist || mode == WriteMode::Rewrite) { /// If metadata file exists - remove and create new. @@ -666,7 +665,8 @@ std::unique_ptr DiskS3::writeFile(const String & path, LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Write to file by path: {} New S3 path: {}", backQuote(metadata_path + path), s3_root_path + s3_path); - return std::make_unique(client, bucket, metadata, s3_path, is_multipart, min_upload_part_size, buf_size); + return std::make_unique( + client, bucket, metadata, s3_path, min_upload_part_size, max_single_part_upload_size, buf_size); } else { @@ -675,7 +675,8 @@ std::unique_ptr DiskS3::writeFile(const String & path, LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Append to file by path: {}. New S3 path: {}. Existing S3 objects: {}.", backQuote(metadata_path + path), s3_root_path + s3_path, metadata.s3_objects.size()); - return std::make_unique(client, bucket, metadata, s3_path, is_multipart, min_upload_part_size, buf_size); + return std::make_unique( + client, bucket, metadata, s3_path, min_upload_part_size, max_single_part_upload_size, buf_size); } } diff --git a/src/Disks/S3/DiskS3.h b/src/Disks/S3/DiskS3.h index fe8c47931b5..aa490d4628d 100644 --- a/src/Disks/S3/DiskS3.h +++ b/src/Disks/S3/DiskS3.h @@ -125,7 +125,7 @@ private: const String s3_root_path; const String metadata_path; size_t min_upload_part_size; - size_t min_multi_part_upload_size; + size_t max_single_part_upload_size; size_t min_bytes_for_seek; UInt64 reserved_bytes = 0; diff --git a/src/Disks/S3/registerDiskS3.cpp b/src/Disks/S3/registerDiskS3.cpp index 5078a7e06ae..544a4e31721 100644 --- a/src/Disks/S3/registerDiskS3.cpp +++ b/src/Disks/S3/registerDiskS3.cpp @@ -147,7 +147,7 @@ void registerDiskS3(DiskFactory & factory) uri.key, metadata_path, context.getSettingsRef().s3_min_upload_part_size, - config.getUInt64(config_prefix + ".min_multi_part_upload_size", 10 * 1024 * 1024), + context.getSettingsRef().s3_max_single_part_upload_size, config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024)); /// This code is used only to check access to the corresponding disk. diff --git a/src/IO/WriteBufferFromS3.cpp b/src/IO/WriteBufferFromS3.cpp index a32aa4acdc9..d208483a7de 100644 --- a/src/IO/WriteBufferFromS3.cpp +++ b/src/IO/WriteBufferFromS3.cpp @@ -5,12 +5,9 @@ # include # include -# include -# include # include -# include -# include # include +# include # include # include # include @@ -42,20 +39,17 @@ WriteBufferFromS3::WriteBufferFromS3( const String & bucket_, const String & key_, size_t minimum_upload_part_size_, - bool is_multipart_, + size_t max_single_part_upload_size_, size_t buffer_size_) : BufferWithOwnMemory(buffer_size_, nullptr, 0) - , is_multipart(is_multipart_) , bucket(bucket_) , key(key_) , client_ptr(std::move(client_ptr_)) - , minimum_upload_part_size{minimum_upload_part_size_} - , temporary_buffer{std::make_unique()} - , last_part_size{0} -{ - if (is_multipart) - initiate(); -} + , minimum_upload_part_size(minimum_upload_part_size_) + , max_single_part_upload_size(max_single_part_upload_size_) + , temporary_buffer(Aws::MakeShared("temporary buffer")) + , last_part_size(0) +{ } void WriteBufferFromS3::nextImpl() { @@ -66,16 +60,17 @@ void WriteBufferFromS3::nextImpl() ProfileEvents::increment(ProfileEvents::S3WriteBytes, offset()); - if (is_multipart) - { - last_part_size += offset(); + last_part_size += offset(); - if (last_part_size > minimum_upload_part_size) - { - writePart(temporary_buffer->str()); - last_part_size = 0; - temporary_buffer->restart(); - } + /// Data size exceeds singlepart upload threshold, need to use multipart upload. + if (multipart_upload_id.empty() && last_part_size > max_single_part_upload_size) + createMultipartUpload(); + + if (!multipart_upload_id.empty() && last_part_size > minimum_upload_part_size) + { + writePart(); + last_part_size = 0; + temporary_buffer = Aws::MakeShared("temporary buffer"); } } @@ -86,17 +81,23 @@ void WriteBufferFromS3::finalize() void WriteBufferFromS3::finalizeImpl() { - if (!finalized) + if (finalized) + return; + + next(); + + if (multipart_upload_id.empty()) { - next(); - - if (is_multipart) - writePart(temporary_buffer->str()); - - complete(); - - finalized = true; + makeSinglepartUpload(); } + else + { + /// Write rest of the data as last part. + writePart(); + completeMultipartUpload(); + } + + finalized = true; } WriteBufferFromS3::~WriteBufferFromS3() @@ -111,7 +112,7 @@ WriteBufferFromS3::~WriteBufferFromS3() } } -void WriteBufferFromS3::initiate() +void WriteBufferFromS3::createMultipartUpload() { Aws::S3::Model::CreateMultipartUploadRequest req; req.SetBucket(bucket); @@ -121,17 +122,17 @@ void WriteBufferFromS3::initiate() if (outcome.IsSuccess()) { - upload_id = outcome.GetResult().GetUploadId(); - LOG_DEBUG(log, "Multipart upload initiated. Upload id: {}", upload_id); + multipart_upload_id = outcome.GetResult().GetUploadId(); + LOG_DEBUG(log, "Multipart upload has created. Upload id: {}", multipart_upload_id); } else throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); } -void WriteBufferFromS3::writePart(const String & data) +void WriteBufferFromS3::writePart() { - if (data.empty()) + if (temporary_buffer->tellp() <= 0) return; if (part_tags.size() == S3_WARN_MAX_PARTS) @@ -145,91 +146,69 @@ void WriteBufferFromS3::writePart(const String & data) req.SetBucket(bucket); req.SetKey(key); req.SetPartNumber(part_tags.size() + 1); - req.SetUploadId(upload_id); - req.SetContentLength(data.size()); - req.SetBody(std::make_shared(data)); + req.SetUploadId(multipart_upload_id); + req.SetContentLength(temporary_buffer->tellp()); + req.SetBody(temporary_buffer); auto outcome = client_ptr->UploadPart(req); - LOG_TRACE(log, "Writing part. Bucket: {}, Key: {}, Upload_id: {}, Data size: {}", bucket, key, upload_id, data.size()); + LOG_TRACE(log, "Writing part. Bucket: {}, Key: {}, Upload_id: {}, Data size: {}", bucket, key, multipart_upload_id, temporary_buffer->tellp()); if (outcome.IsSuccess()) { auto etag = outcome.GetResult().GetETag(); part_tags.push_back(etag); - LOG_DEBUG(log, "Writing part finished. Total parts: {}, Upload_id: {}, Etag: {}", part_tags.size(), upload_id, etag); + LOG_DEBUG(log, "Writing part finished. Total parts: {}, Upload_id: {}, Etag: {}", part_tags.size(), multipart_upload_id, etag); } else throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); } - -void WriteBufferFromS3::complete() +void WriteBufferFromS3::completeMultipartUpload() { - if (is_multipart) + LOG_DEBUG(log, "Completing multipart upload. Bucket: {}, Key: {}, Upload_id: {}", bucket, key, multipart_upload_id); + + Aws::S3::Model::CompleteMultipartUploadRequest req; + req.SetBucket(bucket); + req.SetKey(key); + req.SetUploadId(multipart_upload_id); + + Aws::S3::Model::CompletedMultipartUpload multipart_upload; + for (size_t i = 0; i < part_tags.size(); ++i) { - if (part_tags.empty()) - { - LOG_DEBUG(log, "Multipart upload has no data. Aborting it. Bucket: {}, Key: {}, Upload_id: {}", bucket, key, upload_id); - - Aws::S3::Model::AbortMultipartUploadRequest req; - req.SetBucket(bucket); - req.SetKey(key); - req.SetUploadId(upload_id); - - auto outcome = client_ptr->AbortMultipartUpload(req); - - if (outcome.IsSuccess()) - LOG_DEBUG(log, "Aborting multipart upload completed. Upload_id: {}", upload_id); - else - throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); - - return; - } - - LOG_DEBUG(log, "Completing multipart upload. Bucket: {}, Key: {}, Upload_id: {}", bucket, key, upload_id); - - Aws::S3::Model::CompleteMultipartUploadRequest req; - req.SetBucket(bucket); - req.SetKey(key); - req.SetUploadId(upload_id); - - Aws::S3::Model::CompletedMultipartUpload multipart_upload; - for (size_t i = 0; i < part_tags.size(); ++i) - { - Aws::S3::Model::CompletedPart part; - multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1)); - } - - req.SetMultipartUpload(multipart_upload); - - auto outcome = client_ptr->CompleteMultipartUpload(req); - - if (outcome.IsSuccess()) - LOG_DEBUG(log, "Multipart upload completed. Upload_id: {}", upload_id); - else - throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); + Aws::S3::Model::CompletedPart part; + multipart_upload.AddParts(part.WithETag(part_tags[i]).WithPartNumber(i + 1)); } + + req.SetMultipartUpload(multipart_upload); + + auto outcome = client_ptr->CompleteMultipartUpload(req); + + if (outcome.IsSuccess()) + LOG_DEBUG(log, "Multipart upload has completed. Upload_id: {}", multipart_upload_id); else - { - LOG_DEBUG(log, "Making single part upload. Bucket: {}, Key: {}", bucket, key); + throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); +} - Aws::S3::Model::PutObjectRequest req; - req.SetBucket(bucket); - req.SetKey(key); +void WriteBufferFromS3::makeSinglepartUpload() +{ + if (temporary_buffer->tellp() <= 0) + return; - /// This could be improved using an adapter to WriteBuffer. - const std::shared_ptr input_data = Aws::MakeShared("temporary buffer", temporary_buffer->str()); - temporary_buffer = std::make_unique(); - req.SetBody(input_data); + LOG_DEBUG(log, "Making single part upload. Bucket: {}, Key: {}", bucket, key); - auto outcome = client_ptr->PutObject(req); + Aws::S3::Model::PutObjectRequest req; + req.SetBucket(bucket); + req.SetKey(key); + req.SetContentLength(temporary_buffer->tellp()); + req.SetBody(temporary_buffer); - if (outcome.IsSuccess()) - LOG_DEBUG(log, "Single part upload has completed. Bucket: {}, Key: {}", bucket, key); - else - throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); - } + auto outcome = client_ptr->PutObject(req); + + if (outcome.IsSuccess()) + LOG_DEBUG(log, "Single part upload has completed. Bucket: {}, Key: {}", bucket, key); + else + throw Exception(outcome.GetError().GetMessage(), ErrorCodes::S3_ERROR); } } diff --git a/src/IO/WriteBufferFromS3.h b/src/IO/WriteBufferFromS3.h index 1a1e859d913..9e7b5ce50f5 100644 --- a/src/IO/WriteBufferFromS3.h +++ b/src/IO/WriteBufferFromS3.h @@ -6,11 +6,13 @@ # include # include +# include # include + # include -# include # include -# include + +# include namespace Aws::S3 { @@ -19,23 +21,29 @@ class S3Client; namespace DB { -/* Perform S3 HTTP PUT request. + +/** + * Buffer to write a data to a S3 object with specified bucket and key. + * If data size written to the buffer is less than 'max_single_part_upload_size' write is performed using singlepart upload. + * In another case multipart upload is used: + * Data is divided on chunks with size greater than 'minimum_upload_part_size'. Last chunk can be less than this threshold. + * Each chunk is written as a part to S3. */ class WriteBufferFromS3 : public BufferWithOwnMemory { private: - bool is_multipart; - String bucket; String key; std::shared_ptr client_ptr; size_t minimum_upload_part_size; - std::unique_ptr temporary_buffer; + size_t max_single_part_upload_size; + /// Buffer to accumulate data. + std::shared_ptr temporary_buffer; size_t last_part_size; /// Upload in S3 is made in parts. /// We initiate upload, then upload each part and get ETag as a response, and then finish upload with listing all our parts. - String upload_id; + String multipart_upload_id; std::vector part_tags; Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3"); @@ -46,7 +54,7 @@ public: const String & bucket_, const String & key_, size_t minimum_upload_part_size_, - bool is_multipart, + size_t max_single_part_upload_size_, size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE); void nextImpl() override; @@ -59,9 +67,11 @@ public: private: bool finalized = false; - void initiate(); - void writePart(const String & data); - void complete(); + void createMultipartUpload(); + void writePart(); + void completeMultipartUpload(); + + void makeSinglepartUpload(); void finalizeImpl(); }; diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index ea9319d4693..a533231934a 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -141,17 +141,18 @@ namespace public: StorageS3BlockOutputStream( const String & format, - UInt64 min_upload_part_size, const Block & sample_block_, const Context & context, const CompressionMethod compression_method, const std::shared_ptr & client, const String & bucket, - const String & key) + const String & key, + size_t min_upload_part_size, + size_t max_single_part_upload_size) : sample_block(sample_block_) { write_buf = wrapWriteBufferWithCompressionMethod( - std::make_unique(client, bucket, key, min_upload_part_size, true), compression_method, 3); + std::make_unique(client, bucket, key, min_upload_part_size, max_single_part_upload_size), compression_method, 3); writer = FormatFactory::instance().getOutput(format, *write_buf, sample_block, context); } @@ -192,6 +193,7 @@ StorageS3::StorageS3( const StorageID & table_id_, const String & format_name_, UInt64 min_upload_part_size_, + UInt64 max_single_part_upload_size_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const Context & context_, @@ -201,6 +203,7 @@ StorageS3::StorageS3( , global_context(context_.getGlobalContext()) , format_name(format_name_) , min_upload_part_size(min_upload_part_size_) + , max_single_part_upload_size(max_single_part_upload_size_) , compression_method(compression_method_) , name(uri_.storage_name) { @@ -325,9 +328,15 @@ Pipe StorageS3::read( BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, const Context & /*context*/) { return std::make_shared( - format_name, min_upload_part_size, metadata_snapshot->getSampleBlock(), - global_context, chooseCompressionMethod(uri.endpoint, compression_method), - client, uri.bucket, uri.key); + format_name, + metadata_snapshot->getSampleBlock(), + global_context, + chooseCompressionMethod(uri.endpoint, compression_method), + client, + uri.bucket, + uri.key, + min_upload_part_size, + max_single_part_upload_size); } void registerStorageS3Impl(const String & name, StorageFactory & factory) @@ -356,6 +365,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory) } UInt64 min_upload_part_size = args.local_context.getSettingsRef().s3_min_upload_part_size; + UInt64 max_single_part_upload_size = args.local_context.getSettingsRef().s3_max_single_part_upload_size; String compression_method; String format_name; @@ -377,6 +387,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory) args.table_id, format_name, min_upload_part_size, + max_single_part_upload_size, args.columns, args.constraints, args.context, diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 96f0cf02e88..f436fb85c90 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -31,6 +31,7 @@ public: const StorageID & table_id_, const String & format_name_, UInt64 min_upload_part_size_, + UInt64 max_single_part_upload_size_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const Context & context_, @@ -59,7 +60,8 @@ private: const Context & global_context; String format_name; - UInt64 min_upload_part_size; + size_t min_upload_part_size; + size_t max_single_part_upload_size; String compression_method; std::shared_ptr client; String name; diff --git a/src/TableFunctions/TableFunctionS3.cpp b/src/TableFunctions/TableFunctionS3.cpp index dfe1cf6e792..cc7877b204e 100644 --- a/src/TableFunctions/TableFunctionS3.cpp +++ b/src/TableFunctions/TableFunctionS3.cpp @@ -67,6 +67,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const C Poco::URI uri (filename); S3::URI s3_uri (uri); UInt64 min_upload_part_size = context.getSettingsRef().s3_min_upload_part_size; + UInt64 max_single_part_upload_size = context.getSettingsRef().s3_max_single_part_upload_size; StoragePtr storage = StorageS3::create( s3_uri, @@ -75,6 +76,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const C StorageID(getDatabaseName(), table_name), format, min_upload_part_size, + max_single_part_upload_size, getActualTableStructure(context), ConstraintsDescription{}, const_cast(context), diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index ca7206ca3b5..9e49c5ebf8f 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -306,7 +306,8 @@ def test_multipart_put(cluster, maybe_auth, positive): cluster.minio_redirect_host, cluster.minio_redirect_port, bucket, filename, maybe_auth, table_format) try: - run_query(instance, put_query, stdin=csv_data, settings={'s3_min_upload_part_size': min_part_size_bytes}) + run_query(instance, put_query, stdin=csv_data, settings={'s3_min_upload_part_size': min_part_size_bytes, + 's3_max_single_part_upload_size': 0}) except helpers.client.QueryRuntimeException: if positive: raise