diff --git a/src/Backups/BackupIO_S3.cpp b/src/Backups/BackupIO_S3.cpp index 10d35ed7fc1..48785c244e7 100644 --- a/src/Backups/BackupIO_S3.cpp +++ b/src/Backups/BackupIO_S3.cpp @@ -126,6 +126,7 @@ BackupWriterS3::BackupWriterS3( , max_single_read_retries(context_->getSettingsRef().s3_max_single_read_retries) , read_settings(context_->getReadSettings()) , rw_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).rw_settings) + , log(&Poco::Logger::get("BackupWriterS3")) { rw_settings.updateFromSettingsIfEmpty(context_->getSettingsRef()); } @@ -146,9 +147,12 @@ void BackupWriterS3::copyObjectImpl( const String & src_key, const String & dst_bucket, const String & dst_key, - std::optional head, - std::optional metadata) const + const Aws::S3::Model::HeadObjectResult & head, + const std::optional & metadata) const { + size_t size = head.GetContentLength(); + LOG_TRACE(log, "Copying {} bytes using single-operation copy", size); + Aws::S3::Model::CopyObjectRequest request; request.SetCopySource(src_bucket + "/" + src_key); request.SetBucket(dst_bucket); @@ -186,13 +190,11 @@ void BackupWriterS3::copyObjectMultipartImpl( const String & src_key, const String & dst_bucket, const String & dst_key, - std::optional head, - std::optional metadata) const + const Aws::S3::Model::HeadObjectResult & head, + const std::optional & metadata) const { - if (!head) - head = requestObjectHeadData(src_bucket, src_key).GetResult(); - - size_t size = head->GetContentLength(); + size_t size = head.GetContentLength(); + LOG_TRACE(log, "Copying {} bytes using multipart upload copy", size); String multipart_upload_id; @@ -289,15 +291,14 @@ void BackupWriterS3::copyFileNative(DiskPtr from_disk, const String & file_name_ auto file_path = fs::path(s3_uri.key) / file_name_to; auto head = requestObjectHeadData(source_bucket, objects[0].absolute_path).GetResult(); - static constexpr int64_t multipart_upload_threashold = 5UL * 1024 * 1024 * 1024; - if (head.GetContentLength() >= multipart_upload_threashold) + if (static_cast(head.GetContentLength()) < rw_settings.max_single_operation_copy_size) { - copyObjectMultipartImpl( + copyObjectImpl( source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head); } else { - copyObjectImpl( + copyObjectMultipartImpl( source_bucket, objects[0].absolute_path, s3_uri.bucket, file_path, head); } } diff --git a/src/Backups/BackupIO_S3.h b/src/Backups/BackupIO_S3.h index 471ddcc06e6..b52de23e262 100644 --- a/src/Backups/BackupIO_S3.h +++ b/src/Backups/BackupIO_S3.h @@ -61,7 +61,6 @@ public: void copyFileNative(DiskPtr from_disk, const String & file_name_from, const String & file_name_to) override; private: - Aws::S3::Model::HeadObjectOutcome requestObjectHeadData(const std::string & bucket_from, const std::string & key) const; void copyObjectImpl( @@ -69,22 +68,23 @@ private: const String & src_key, const String & dst_bucket, const String & dst_key, - std::optional head = std::nullopt, - std::optional metadata = std::nullopt) const; + const Aws::S3::Model::HeadObjectResult & head, + const std::optional & metadata = std::nullopt) const; void copyObjectMultipartImpl( const String & src_bucket, const String & src_key, const String & dst_bucket, const String & dst_key, - std::optional head = std::nullopt, - std::optional metadata = std::nullopt) const; + const Aws::S3::Model::HeadObjectResult & head, + const std::optional & metadata = std::nullopt) const; S3::URI s3_uri; std::shared_ptr client; UInt64 max_single_read_retries; ReadSettings read_settings; S3Settings::ReadWriteSettings rw_settings; + Poco::Logger * log; }; } diff --git a/src/Storages/StorageS3Settings.cpp b/src/Storages/StorageS3Settings.cpp index 65e9bb1ab8c..7a89168b49a 100644 --- a/src/Storages/StorageS3Settings.cpp +++ b/src/Storages/StorageS3Settings.cpp @@ -5,13 +5,20 @@ #include #include #include - +#include #include namespace DB { +namespace +{ + /// An object up to 5 GB can be copied in a single atomic operation. + constexpr UInt64 DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE = 5_GiB; +} + + void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config, const Settings & settings) { std::lock_guard lock(mutex); @@ -53,6 +60,7 @@ void StorageS3Settings::loadFromConfig(const String & config_elem, const Poco::U rw_settings.upload_part_size_multiply_factor = get_uint_for_key(key, "upload_part_size_multiply_factor", true, settings.s3_upload_part_size_multiply_factor); rw_settings.upload_part_size_multiply_parts_count_threshold = get_uint_for_key(key, "upload_part_size_multiply_parts_count_threshold", true, settings.s3_upload_part_size_multiply_parts_count_threshold); rw_settings.max_single_part_upload_size = get_uint_for_key(key, "max_single_part_upload_size", true, settings.s3_max_single_part_upload_size); + rw_settings.max_single_operation_copy_size = get_uint_for_key(key, "max_single_operation_copy_size", true, DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE); rw_settings.max_connections = get_uint_for_key(key, "max_connections", true, settings.s3_max_connections); rw_settings.check_objects_after_upload = get_bool_for_key(key, "check_objects_after_upload", true, false); @@ -101,6 +109,8 @@ void S3Settings::ReadWriteSettings::updateFromSettingsIfEmpty(const Settings & s upload_part_size_multiply_parts_count_threshold = settings.s3_upload_part_size_multiply_parts_count_threshold; if (!max_single_part_upload_size) max_single_part_upload_size = settings.s3_max_single_part_upload_size; + if (!max_single_operation_copy_size) + max_single_operation_copy_size = DEFAULT_MAX_SINGLE_OPERATION_COPY_SIZE; if (!max_connections) max_connections = settings.s3_max_connections; if (!max_unexpected_write_error_retries) diff --git a/src/Storages/StorageS3Settings.h b/src/Storages/StorageS3Settings.h index 2da4a1d7590..b3584acc6f1 100644 --- a/src/Storages/StorageS3Settings.h +++ b/src/Storages/StorageS3Settings.h @@ -30,6 +30,7 @@ struct S3Settings size_t upload_part_size_multiply_factor = 0; size_t upload_part_size_multiply_parts_count_threshold = 0; size_t max_single_part_upload_size = 0; + size_t max_single_operation_copy_size = 0; size_t max_connections = 0; bool check_objects_after_upload = false; size_t max_unexpected_write_error_retries = 0; @@ -44,6 +45,7 @@ struct S3Settings && upload_part_size_multiply_factor == other.upload_part_size_multiply_factor && upload_part_size_multiply_parts_count_threshold == other.upload_part_size_multiply_parts_count_threshold && max_single_part_upload_size == other.max_single_part_upload_size + && max_single_operation_copy_size == other.max_single_operation_copy_size && max_connections == other.max_connections && check_objects_after_upload == other.check_objects_after_upload && max_unexpected_write_error_retries == other.max_unexpected_write_error_retries; diff --git a/tests/integration/test_backup_restore_s3/configs/s3_settings.xml b/tests/integration/test_backup_restore_s3/configs/s3_settings.xml new file mode 100644 index 00000000000..2aef4db55c8 --- /dev/null +++ b/tests/integration/test_backup_restore_s3/configs/s3_settings.xml @@ -0,0 +1,12 @@ + + + + http://minio1:9001/root/data/backups/multipart_upload_copy/ + + 1 + 5242880 + 3 + 2 + + + diff --git a/tests/integration/test_backup_restore_s3/test.py b/tests/integration/test_backup_restore_s3/test.py index 617c14d6736..7ddb1459ab9 100644 --- a/tests/integration/test_backup_restore_s3/test.py +++ b/tests/integration/test_backup_restore_s3/test.py @@ -4,7 +4,11 @@ from helpers.cluster import ClickHouseCluster cluster = ClickHouseCluster(__file__) node = cluster.add_instance( "node", - main_configs=["configs/disk_s3.xml", "configs/named_collection_s3_backups.xml"], + main_configs=[ + "configs/disk_s3.xml", + "configs/named_collection_s3_backups.xml", + "configs/s3_settings.xml", + ], with_minio=True, ) @@ -27,17 +31,17 @@ def new_backup_name(): return f"backup{backup_id_counter}" -def check_backup_and_restore(storage_policy, backup_destination): +def check_backup_and_restore(storage_policy, backup_destination, size=1000): node.query( f""" DROP TABLE IF EXISTS data NO DELAY; CREATE TABLE data (key Int, value String, array Array(String)) Engine=MergeTree() ORDER BY tuple() SETTINGS storage_policy='{storage_policy}'; - INSERT INTO data SELECT * FROM generateRandom('key Int, value String, array Array(String)') LIMIT 1000; + INSERT INTO data SELECT * FROM generateRandom('key Int, value String, array Array(String)') LIMIT {size}; BACKUP TABLE data TO {backup_destination}; RESTORE TABLE data AS data_restored FROM {backup_destination}; SELECT throwIf( - (SELECT groupArray(tuple(*)) FROM data) != - (SELECT groupArray(tuple(*)) FROM data_restored), + (SELECT count(), sum(sipHash64(*)) FROM data) != + (SELECT count(), sum(sipHash64(*)) FROM data_restored), 'Data does not matched after BACKUP/RESTORE' ); DROP TABLE data NO DELAY; @@ -106,9 +110,10 @@ def test_backup_to_s3_native_copy(): ) check_backup_and_restore(storage_policy, backup_destination) assert node.contains_in_log("using native copy") + assert node.contains_in_log("single-operation copy") -def test_backup_to_s3_other_bucket_native_copy(): +def test_backup_to_s3_native_copy_other_bucket(): storage_policy = "policy_s3_other_bucket" backup_name = new_backup_name() backup_destination = ( @@ -116,3 +121,13 @@ def test_backup_to_s3_other_bucket_native_copy(): ) check_backup_and_restore(storage_policy, backup_destination) assert node.contains_in_log("using native copy") + assert node.contains_in_log("single-operation copy") + + +def test_backup_to_s3_native_copy_multipart_upload(): + storage_policy = "policy_s3" + backup_name = new_backup_name() + backup_destination = f"S3('http://minio1:9001/root/data/backups/multipart_upload_copy/{backup_name}', 'minio', 'minio123')" + check_backup_and_restore(storage_policy, backup_destination, size=1000000) + assert node.contains_in_log("using native copy") + assert node.contains_in_log("multipart upload copy")