From 1e567d5008b01809e9b5361a1b3385477271659a Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Thu, 21 Sep 2023 18:50:35 +0200 Subject: [PATCH] Retry backup s3 operations after ConnectionResetException. --- src/Backups/BackupIO_S3.cpp | 37 ++++++++++++++++++++++--------------- src/Backups/BackupIO_S3.h | 10 +++++----- src/IO/S3/Client.cpp | 6 +++++- 3 files changed, 32 insertions(+), 21 deletions(-) diff --git a/src/Backups/BackupIO_S3.cpp b/src/Backups/BackupIO_S3.cpp index 5b08683b157..8bb2f895e38 100644 --- a/src/Backups/BackupIO_S3.cpp +++ b/src/Backups/BackupIO_S3.cpp @@ -32,11 +32,13 @@ namespace ErrorCodes namespace { - std::shared_ptr - makeS3Client(const S3::URI & s3_uri, const String & access_key_id, const String & secret_access_key, const ContextPtr & context) + std::shared_ptr makeS3Client( + const S3::URI & s3_uri, + const String & access_key_id, + const String & secret_access_key, + const S3Settings & settings, + const ContextPtr & context) { - auto settings = context->getStorageS3Settings().getSettings(s3_uri.uri.toString()); - Aws::Auth::AWSCredentials credentials(access_key_id, secret_access_key); HTTPHeaderEntries headers; if (access_key_id.empty()) @@ -45,13 +47,15 @@ namespace headers = settings.auth_settings.headers; } + const auto & request_settings = settings.request_settings; + S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration( settings.auth_settings.region, context->getRemoteHostFilter(), static_cast(context->getGlobalContext()->getSettingsRef().s3_max_redirects), static_cast(context->getGlobalContext()->getSettingsRef().s3_retry_attempts), context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging, - /* for_disk_s3 = */ false, settings.request_settings.get_request_throttler, settings.request_settings.put_request_throttler, + /* for_disk_s3 = */ false, request_settings.get_request_throttler, request_settings.put_request_throttler, s3_uri.uri.getScheme()); client_configuration.endpointOverride = s3_uri.endpoint; @@ -60,6 +64,7 @@ namespace client_configuration.connectTimeoutMs = 10 * 1000; /// Requests in backups can be extremely long, set to one hour client_configuration.requestTimeoutMs = 60 * 60 * 1000; + client_configuration.retryStrategy = std::make_shared(request_settings.retry_attempts); return S3::ClientFactory::instance().create( client_configuration, @@ -112,13 +117,14 @@ BackupReaderS3::BackupReaderS3( const ContextPtr & context_) : BackupReaderDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupReaderS3")) , s3_uri(s3_uri_) - , client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_)) - , request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings) , data_source_description{DataSourceType::S3, s3_uri.endpoint, false, false} + , s3_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString())) { + auto & request_settings = s3_settings.request_settings; request_settings.updateFromSettings(context_->getSettingsRef()); request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint request_settings.allow_native_copy = allow_s3_native_copy; + client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_); } BackupReaderS3::~BackupReaderS3() = default; @@ -139,7 +145,7 @@ UInt64 BackupReaderS3::getFileSize(const String & file_name) std::unique_ptr BackupReaderS3::readFile(const String & file_name) { return std::make_unique( - client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, request_settings, read_settings); + client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, s3_settings.request_settings, read_settings); } void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_size, bool encrypted_in_backup, @@ -169,7 +175,7 @@ void BackupReaderS3::copyFileToDisk(const String & path_in_backup, size_t file_s file_size, /* dest_bucket= */ blob_path[1], /* dest_key= */ blob_path[0], - request_settings, + s3_settings.request_settings, read_settings, object_attributes, threadPoolCallbackRunner(getBackupsIOThreadPool().get(), "BackupReaderS3"), @@ -198,14 +204,15 @@ BackupWriterS3::BackupWriterS3( const ContextPtr & context_) : BackupWriterDefault(read_settings_, write_settings_, &Poco::Logger::get("BackupWriterS3")) , s3_uri(s3_uri_) - , client(makeS3Client(s3_uri_, access_key_id_, secret_access_key_, context_)) - , request_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString()).request_settings) , data_source_description{DataSourceType::S3, s3_uri.endpoint, false, false} + , s3_settings(context_->getStorageS3Settings().getSettings(s3_uri.uri.toString())) { + auto & request_settings = s3_settings.request_settings; request_settings.updateFromSettings(context_->getSettingsRef()); request_settings.max_single_read_retries = context_->getSettingsRef().s3_max_single_read_retries; // FIXME: Avoid taking value for endpoint request_settings.allow_native_copy = allow_s3_native_copy; request_settings.setStorageClassName(storage_class_name); + client = makeS3Client(s3_uri_, access_key_id_, secret_access_key_, s3_settings, context_); } void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src_disk, const String & src_path, @@ -230,7 +237,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src length, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, - request_settings, + s3_settings.request_settings, read_settings, {}, threadPoolCallbackRunner(getBackupsIOThreadPool().get(), "BackupWriterS3")); @@ -244,7 +251,7 @@ void BackupWriterS3::copyFileFromDisk(const String & path_in_backup, DiskPtr src void BackupWriterS3::copyDataToFile(const String & path_in_backup, const CreateReadBufferFunction & create_read_buffer, UInt64 start_pos, UInt64 length) { - copyDataToS3File(create_read_buffer, start_pos, length, client, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, request_settings, {}, + copyDataToS3File(create_read_buffer, start_pos, length, client, client, s3_uri.bucket, fs::path(s3_uri.key) / path_in_backup, s3_settings.request_settings, {}, threadPoolCallbackRunner(getBackupsIOThreadPool().get(), "BackupWriterS3")); } @@ -266,7 +273,7 @@ UInt64 BackupWriterS3::getFileSize(const String & file_name) std::unique_ptr BackupWriterS3::readFile(const String & file_name, size_t expected_file_size) { return std::make_unique( - client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, request_settings, read_settings, + client, s3_uri.bucket, fs::path(s3_uri.key) / file_name, s3_uri.version_id, s3_settings.request_settings, read_settings, false, 0, 0, false, expected_file_size); } @@ -278,7 +285,7 @@ std::unique_ptr BackupWriterS3::writeFile(const String & file_name) s3_uri.bucket, fs::path(s3_uri.key) / file_name, DBMS_DEFAULT_BUFFER_SIZE, - request_settings, + s3_settings.request_settings, std::nullopt, threadPoolCallbackRunner(getBackupsIOThreadPool().get(), "BackupWriterS3"), write_settings); diff --git a/src/Backups/BackupIO_S3.h b/src/Backups/BackupIO_S3.h index a29c91498ec..4abcbedf89f 100644 --- a/src/Backups/BackupIO_S3.h +++ b/src/Backups/BackupIO_S3.h @@ -29,9 +29,9 @@ public: private: const S3::URI s3_uri; - const std::shared_ptr client; - S3Settings::RequestSettings request_settings; const DataSourceDescription data_source_description; + S3Settings s3_settings; + std::shared_ptr client; }; @@ -57,10 +57,10 @@ private: void removeFilesBatch(const Strings & file_names); const S3::URI s3_uri; - const std::shared_ptr client; - S3Settings::RequestSettings request_settings; - std::optional supports_batch_delete; const DataSourceDescription data_source_description; + S3Settings s3_settings; + std::shared_ptr client; + std::optional supports_batch_delete; }; } diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp index 21d87c58d20..79f150259db 100644 --- a/src/IO/S3/Client.cpp +++ b/src/IO/S3/Client.cpp @@ -27,6 +27,8 @@ #include #include +#include + namespace ProfileEvents { @@ -599,7 +601,9 @@ Client::doRequestWithRetryNetworkErrors(const RequestType & request, RequestFn r last_exception = std::current_exception(); auto error = Aws::Client::AWSError(Aws::Client::CoreErrors::NETWORK_CONNECTION, /*retry*/ true); - client_configuration.retryStrategy->CalculateDelayBeforeNextRetry(error, attempt_no); + auto sleep_ms = client_configuration.retryStrategy->CalculateDelayBeforeNextRetry(error, attempt_no); + LOG_WARNING(log, "Request failed, now waiting {} ms before attempting again", sleep_ms); + sleepForMilliseconds(sleep_ms); continue; } }