Use long timeout for S3 copy requests

This commit is contained in:
Michael Kolupaev 2023-08-17 19:36:40 +00:00
parent 81af60eeea
commit 557bfea4d8
4 changed files with 33 additions and 16 deletions

View File

@ -159,6 +159,7 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
blob_path.size(), mode);
copyS3File(
client,
client,
s3_uri.bucket,
fs::path(s3_uri.key) / path_in_backup,
@ -218,6 +219,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
{
LOG_TRACE(log, "Copying file {} from disk {} to S3", src_path, src_disk->getName());
copyS3File(
client,
client,
/* src_bucket */ blob_path[1],
/* src_key= */ blob_path[0],
@ -238,7 +240,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
void BackupWriterS3::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
{
copyDataToS3File(create_read_buffer, start_pos, length, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, request_settings, {},
copyDataToS3File(create_read_buffer, start_pos, length, client, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, request_settings, {},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
}

View File

@ -431,11 +431,11 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
/// Shortcut for S3
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
{
auto client_ptr = clients.get()->client;
auto clients_ = clients.get();
auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*client_ptr, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
copyS3File(client_ptr, bucket, object_from.remote_path, 0, size, dest_s3->bucket, object_to.remote_path,
copyS3File(clients_->client, clients_->client_with_long_timeout, bucket, object_from.remote_path, 0, size, dest_s3->bucket, object_to.remote_path,
settings_ptr->request_settings, object_to_attributes, scheduler, /* for_disk_s3= */ true);
}
else
@ -447,11 +447,11 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
void S3ObjectStorage::copyObject( // NOLINT
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> object_to_attributes)
{
auto client_ptr = clients.get()->client;
auto clients_ = clients.get();
auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*client_ptr, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto size = S3::getObjectSize(*clients_->client, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
copyS3File(client_ptr, bucket, object_from.remote_path, 0, size, bucket, object_to.remote_path,
copyS3File(clients_->client, clients_->client_with_long_timeout, bucket, object_from.remote_path, 0, size, bucket, object_to.remote_path,
settings_ptr->request_settings, object_to_attributes, scheduler, /* for_disk_s3= */ true);
}

View File

@ -53,6 +53,7 @@ namespace
public:
UploadHelper(
const std::shared_ptr<const S3::Client> & client_ptr_,
const std::shared_ptr<const S3::Client> & client_with_long_timeout_ptr_,
const String & dest_bucket_,
const String & dest_key_,
const S3Settings::RequestSettings & request_settings_,
@ -61,6 +62,7 @@ namespace
bool for_disk_s3_,
const Poco::Logger * log_)
: client_ptr(client_ptr_)
, client_with_long_timeout_ptr(client_with_long_timeout_ptr_)
, dest_bucket(dest_bucket_)
, dest_key(dest_key_)
, request_settings(request_settings_)
@ -76,6 +78,7 @@ namespace
protected:
std::shared_ptr<const S3::Client> client_ptr;
std::shared_ptr<const S3::Client> client_with_long_timeout_ptr;
const String & dest_bucket;
const String & dest_key;
const S3Settings::RequestSettings & request_settings;
@ -176,7 +179,7 @@ namespace
if (for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload);
auto outcome = client_ptr->CompleteMultipartUpload(request);
auto outcome = client_with_long_timeout_ptr->CompleteMultipartUpload(request);
if (outcome.IsSuccess())
{
@ -430,13 +433,14 @@ namespace
size_t offset_,
size_t size_,
const std::shared_ptr<const S3::Client> & client_ptr_,
const std::shared_ptr<const S3::Client> & client_with_long_timeout_ptr_,
const String & dest_bucket_,
const String & dest_key_,
const S3Settings::RequestSettings & request_settings_,
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
bool for_disk_s3_)
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyDataToS3File"))
: UploadHelper(client_ptr_, client_with_long_timeout_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyDataToS3File"))
, create_read_buffer(create_read_buffer_)
, offset(offset_)
, size(size_)
@ -598,6 +602,7 @@ namespace
public:
CopyFileHelper(
const std::shared_ptr<const S3::Client> & client_ptr_,
const std::shared_ptr<const S3::Client> & client_with_long_timeout_ptr_,
const String & src_bucket_,
const String & src_key_,
size_t src_offset_,
@ -608,7 +613,7 @@ namespace
const std::optional<std::map<String, String>> & object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
bool for_disk_s3_)
: UploadHelper(client_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyS3File"))
: UploadHelper(client_ptr_, client_with_long_timeout_ptr_, dest_bucket_, dest_key_, request_settings_, object_metadata_, schedule_, for_disk_s3_, &Poco::Logger::get("copyS3File"))
, src_bucket(src_bucket_)
, src_key(src_key_)
, offset(src_offset_)
@ -669,7 +674,7 @@ namespace
/// If we don't do it, AWS SDK can mistakenly set it to application/xml, see https://github.com/aws/aws-sdk-cpp/issues/1840
request.SetContentType("binary/octet-stream");
client_ptr->setKMSHeaders(request);
client_with_long_timeout_ptr->setKMSHeaders(request);
}
void processCopyRequest(const S3::CopyObjectRequest & request)
@ -681,7 +686,7 @@ namespace
if (for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3CopyObject);
auto outcome = client_ptr->CopyObject(request);
auto outcome = client_with_long_timeout_ptr->CopyObject(request);
if (outcome.IsSuccess())
{
LOG_TRACE(
@ -706,6 +711,7 @@ namespace
offset,
size,
client_ptr,
client_with_long_timeout_ptr,
dest_bucket,
dest_key,
request_settings,
@ -779,7 +785,7 @@ namespace
if (for_disk_s3)
ProfileEvents::increment(ProfileEvents::DiskS3UploadPartCopy);
auto outcome = client_ptr->UploadPartCopy(req);
auto outcome = client_with_long_timeout_ptr->UploadPartCopy(req);
if (!outcome.IsSuccess())
{
abortMultipartUpload();
@ -797,6 +803,7 @@ void copyDataToS3File(
size_t offset,
size_t size,
const std::shared_ptr<const S3::Client> & dest_s3_client,
const std::shared_ptr<const S3::Client> & dest_s3_client_with_long_timeout,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,
@ -804,13 +811,14 @@ void copyDataToS3File(
ThreadPoolCallbackRunner<void> schedule,
bool for_disk_s3)
{
CopyDataToFileHelper helper{create_read_buffer, offset, size, dest_s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3};
CopyDataToFileHelper helper{create_read_buffer, offset, size, dest_s3_client, dest_s3_client_with_long_timeout, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3};
helper.performCopy();
}
void copyS3File(
const std::shared_ptr<const S3::Client> & s3_client,
const std::shared_ptr<const S3::Client> & s3_client_with_long_timeout,
const String & src_bucket,
const String & src_key,
size_t src_offset,
@ -824,7 +832,7 @@ void copyS3File(
{
if (settings.allow_native_copy)
{
CopyFileHelper helper{s3_client, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3};
CopyFileHelper helper{s3_client, s3_client_with_long_timeout, src_bucket, src_key, src_offset, src_size, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3};
helper.performCopy();
}
else
@ -833,7 +841,7 @@ void copyS3File(
{
return std::make_unique<ReadBufferFromS3>(s3_client, src_bucket, src_key, "", settings, Context::getGlobalContextInstance()->getReadSettings());
};
copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3);
copyDataToS3File(create_read_buffer, src_offset, src_size, s3_client, s3_client_with_long_timeout, dest_bucket, dest_key, settings, object_metadata, schedule, for_disk_s3);
}
}

View File

@ -26,8 +26,14 @@ using CreateReadBuffer = std::function<std::unique_ptr<SeekableReadBuffer>()>;
/// has been disabled (with settings.allow_native_copy) or request failed
/// because it is a known issue, it is fallbacks to read-write copy
/// (copyDataToS3File()).
///
/// s3_client_with_long_timeout (may be equal to s3_client) is used for native copy and
/// CompleteMultipartUpload requests. These requests need longer timeout because S3 servers often
/// block on them for multiple seconds without sending or receiving data from us (maybe the servers
/// are copying data internally, or maybe throttling, idk).
void copyS3File(
const std::shared_ptr<const S3::Client> & s3_client,
const std::shared_ptr<const S3::Client> & s3_client_with_long_timeout,
const String & src_bucket,
const String & src_key,
size_t src_offset,
@ -49,6 +55,7 @@ void copyDataToS3File(
size_t offset,
size_t size,
const std::shared_ptr<const S3::Client> & dest_s3_client,
const std::shared_ptr<const S3::Client> & dest_s3_client_with_long_timeout,
const String & dest_bucket,
const String & dest_key,
const S3Settings::RequestSettings & settings,