Merge pull request #54900 from vitlibar/retry-backup-s3-operations-after-conection-reset

Retry backup S3 operations after connection reset failure
This commit is contained in:
Vitaly Baranov 2023-09-26 18:36:10 +02:00 committed by GitHub
commit fe008c23c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 32 additions and 21 deletions

View File

@ -32,11 +32,13 @@ namespace ErrorCodes
namespace
{
std::shared_ptr<S3::Client>
makeS3Client(const S3::URI & s3_uri, const String & access_key_id, const String & secret_access_key, const ContextPtr & context)
std::shared_ptr<S3::Client> makeS3Client(
const S3::URI & s3_uri,
const String & access_key_id,
const String & secret_access_key,
const S3Settings & settings,
const ContextPtr & context)
{
auto settings = context->getStorageS3Settings().getSettings(s3_uri.uri.toString());
Aws::Auth::AWSCredentials credentials(access_key_id, secret_access_key);
HTTPHeaderEntries headers;
if (access_key_id.empty())
@ -45,13 +47,15 @@ namespace
headers = settings.auth_settings.headers;
}
const auto & request_settings = settings.request_settings;
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
settings.auth_settings.region,
context->getRemoteHostFilter(),
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_retry_attempts),
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
/* for_disk_s3 = */ false, settings.request_settings.get_request_throttler, settings.request_settings.put_request_throttler,
/* for_disk_s3 = */ false, request_settings.get_request_throttler, request_settings.put_request_throttler,
s3_uri.uri.getScheme());
client_configuration.endpointOverride = s3_uri.endpoint;
@ -60,6 +64,7 @@ namespace
client_configuration.connectTimeoutMs = 10 * 1000;
/// Requests in backups can be extremely long, set to one hour
client_configuration.requestTimeoutMs = 60 * 60 * 1000;
client_configuration.retryStrategy = std::make_shared<Aws::Client::DefaultRetryStrategy>(request_settings.retry_attempts);
return S3::ClientFactory::instance().create(
client_configuration,
@ -112,13 +117,14 @@ BackupReaderS3::BackupReaderS3(
const ContextPtr & context_)
: BackupReaderDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupReaderS3"))
, s3_uri(s3_uri_)
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)
, data_source_description{DataSourceType::S3, s3_uri.endpoint, false, false}
, s3_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()))
{
auto & request_settings = s3_settings.request_settings;
request_settings.updateFromSettings(context_->getSettingsRef());
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
request_settings.allow_native_copy = allow_s3_native_copy;
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
}
BackupReaderS3::~BackupReaderS3() = default;
@ -139,7 +145,7 @@ UInt64 BackupReaderS3::getFileSize(const String & file_name)
std::unique_ptr<SeekableReadBuffer> BackupReaderS3::readFile(const String & file_name)
{
return std::make_unique<ReadBufferFromS3>(
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, request_settings, read_settings);
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, s3_settings.request_settings, read_settings);
}
void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_size, bool encrypted_in_backup,
@ -169,7 +175,7 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s
file_size,
/* dest_bucket= */ blob_path[1],
/* dest_key= */ blob_path[0],
request_settings,
s3_settings.request_settings,
read_settings,
object_attributes,
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupReaderS3"),
@ -198,14 +204,15 @@ BackupWriterS3::BackupWriterS3(
const ContextPtr & context_)
: BackupWriterDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupWriterS3"))
, s3_uri(s3_uri_)
, client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_))
, request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings)
, data_source_description{DataSourceType::S3, s3_uri.endpoint, false, false}
, s3_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()))
{
auto & request_settings = s3_settings.request_settings;
request_settings.updateFromSettings(context_->getSettingsRef());
request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint
request_settings.allow_native_copy = allow_s3_native_copy;
request_settings.setStorageClassName(storage_class_name);
client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_);
}
void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path,
@ -230,7 +237,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
length,
s3_uri.bucket,
fs::path(s3_uri.key) / path_in_backup,
request_settings,
s3_settings.request_settings,
read_settings,
{},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
@ -244,7 +251,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src
void BackupWriterS3::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length)
{
copyDataToS3File(create_read_buffer, start_pos, length, client, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, request_settings, {},
copyDataToS3File(create_read_buffer, start_pos, length, client, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, s3_settings.request_settings, {},
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"));
}
@ -266,7 +273,7 @@ UInt64 BackupWriterS3::getFileSize(const String & file_name)
std::unique_ptr<ReadBuffer> BackupWriterS3::readFile(const String & file_name, size_t expected_file_size)
{
return std::make_unique<ReadBufferFromS3>(
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, request_settings, read_settings,
client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, s3_settings.request_settings, read_settings,
false, 0, 0, false, expected_file_size);
}
@ -278,7 +285,7 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
s3_uri.bucket,
fs::path(s3_uri.key) / file_name,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings,
s3_settings.request_settings,
std::nullopt,
threadPoolCallbackRunner<void>(getBackupsIOThreadPool().get(), "BackupWriterS3"),
write_settings);

View File

@ -29,9 +29,9 @@ public:
private:
const S3::URI s3_uri;
const std::shared_ptr<S3::Client> client;
S3Settings::RequestSettings request_settings;
const DataSourceDescription data_source_description;
S3Settings s3_settings;
std::shared_ptr<S3::Client> client;
};
@ -57,10 +57,10 @@ private:
void removeFilesBatch(const Strings & file_names);
const S3::URI s3_uri;
const std::shared_ptr<S3::Client> client;
S3Settings::RequestSettings request_settings;
std::optional<bool> supports_batch_delete;
const DataSourceDescription data_source_description;
S3Settings s3_settings;
std::shared_ptr<S3::Client> client;
std::optional<bool> supports_batch_delete;
};
}

View File

@ -27,6 +27,8 @@
#include <Common/logger_useful.h>
#include <Common/ProxyConfigurationResolverProvider.h>
#include <base/sleep.h>
namespace ProfileEvents
{
@ -599,7 +601,9 @@ Client::doRequestWithRetryNetworkErrors(const RequestType & request, RequestFn r
last_exception = std::current_exception();
auto error = Aws::Client::AWSError<Aws::Client::CoreErrors>(Aws::Client::CoreErrors::NETWORK_CONNECTION, /*retry*/ true);
client_configuration.retryStrategy->CalculateDelayBeforeNextRetry(error, attempt_no);
auto sleep_ms = client_configuration.retryStrategy->CalculateDelayBeforeNextRetry(error, attempt_no);
LOG_WARNING(log, "Request failed, now waiting {} ms before attempting again", sleep_ms);
sleepForMilliseconds(sleep_ms);
continue;
}
}