limit the delay before next try in S3

This commit is contained in:
Sema Checherinda 2023-09-14 19:45:07 +02:00
parent a365557d9e
commit d9e15c00c9
11 changed files with 44 additions and 52 deletions

View File

@ -49,6 +49,7 @@ namespace
settings.auth_settings.region,
context->getRemoteHostFilter(),
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_retry_attempts),
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
/* for_disk_s3 = */ false, settings.request_settings.get_request_throttler, settings.request_settings.put_request_throttler,
s3_uri.uri.getScheme());

View File

@ -80,6 +80,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
auto headers = auth_settings.headers;
static constexpr size_t s3_max_redirects = 10;
static constexpr size_t s3_retry_attempts = 10;
static constexpr bool enable_s3_requests_logging = false;
if (!new_uri.key.empty())
@ -90,7 +91,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
auth_settings.region,
RemoteHostFilter(), s3_max_redirects,
RemoteHostFilter(), s3_max_redirects, s3_retry_attempts,
enable_s3_requests_logging,
/* for_disk_s3 = */ false, /* get_request_throttler = */ {}, /* put_request_throttler = */ {},
new_uri.uri.getScheme());

View File

@ -52,6 +52,7 @@ std::unique_ptr<S3::Client> getClient(
config.getString(config_prefix + ".region", ""),
context->getRemoteHostFilter(),
static_cast<int>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
static_cast<int>(context->getGlobalContext()->getSettingsRef().s3_retry_attempts),
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
/* for_disk_s3 = */ true,
settings.request_settings.get_request_throttler,

View File

@ -49,11 +49,12 @@ namespace ErrorCodes
namespace S3
{
Client::RetryStrategy::RetryStrategy(std::shared_ptr<Aws::Client::RetryStrategy> wrapped_strategy_)
: wrapped_strategy(std::move(wrapped_strategy_))
Client::RetryStrategy::RetryStrategy(uint32_t maxRetries_, uint32_t scaleFactor_, uint32_t maxDelayMs_)
: maxRetries(maxRetries_)
, scaleFactor(scaleFactor_)
, maxDelayMs(maxDelayMs_)
{
if (!wrapped_strategy)
wrapped_strategy = Aws::Client::InitRetryStrategy();
chassert(maxDelayMs <= uint64_t(scaleFactor) * (1ul << 31l));
}
/// NOLINTNEXTLINE(google-runtime-int)
@ -62,39 +63,28 @@ bool Client::RetryStrategy::ShouldRetry(const Aws::Client::AWSError<Aws::Client:
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::MOVED_PERMANENTLY)
return false;
return wrapped_strategy->ShouldRetry(error, attemptedRetries);
if (attemptedRetries >= maxRetries)
return false;
return error.ShouldRetry();
}
/// NOLINTNEXTLINE(google-runtime-int)
long Client::RetryStrategy::CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const
long Client::RetryStrategy::CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>&, long attemptedRetries) const
{
return wrapped_strategy->CalculateDelayBeforeNextRetry(error, attemptedRetries);
if (attemptedRetries == 0)
{
return 0;
}
uint64_t backoffLimitedPow = 1ul << std::min(attemptedRetries, 31l);
return std::min<uint64_t>(scaleFactor * backoffLimitedPow, maxDelayMs);
}
/// NOLINTNEXTLINE(google-runtime-int)
long Client::RetryStrategy::GetMaxAttempts() const
{
return wrapped_strategy->GetMaxAttempts();
}
void Client::RetryStrategy::GetSendToken()
{
return wrapped_strategy->GetSendToken();
}
bool Client::RetryStrategy::HasSendToken()
{
return wrapped_strategy->HasSendToken();
}
void Client::RetryStrategy::RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome)
{
return wrapped_strategy->RequestBookkeeping(httpResponseOutcome);
}
void Client::RetryStrategy::RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome, const Aws::Client::AWSError<Aws::Client::CoreErrors>& lastError)
{
return wrapped_strategy->RequestBookkeeping(httpResponseOutcome, lastError);
return maxRetries;
}
namespace
@ -846,7 +836,8 @@ std::unique_ptr<S3::Client> ClientFactory::create( // NOLINT
std::move(credentials),
credentials_configuration);
client_configuration.retryStrategy = std::make_shared<Client::RetryStrategy>(std::move(client_configuration.retryStrategy));
client_configuration.retryStrategy = std::make_shared<Client::RetryStrategy>(client_configuration.s3_retry_attempts);
return Client::create(
client_configuration.s3_max_redirects,
std::move(sse_kms_config),
@ -861,6 +852,7 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT
const String & force_region,
const RemoteHostFilter & remote_host_filter,
unsigned int s3_max_redirects,
unsigned int s3_retry_attempts,
bool enable_s3_requests_logging,
bool for_disk_s3,
const ThrottlerPtr & get_request_throttler,
@ -879,6 +871,7 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT
force_region,
remote_host_filter,
s3_max_redirects,
s3_retry_attempts,
enable_s3_requests_logging,
for_disk_s3,
get_request_throttler,

View File

@ -152,16 +152,16 @@ public:
Aws::Auth::AWSCredentials getCredentials() const;
/// Decorator for RetryStrategy needed for this client to work correctly.
/// We want to manually handle permanent moves (status code 301) because:
/// - redirect location is written in XML format inside the response body something that doesn't exist for HEAD
/// requests so we need to manually find the correct location
/// - we want to cache the new location to decrease number of roundtrips for future requests
/// This decorator doesn't retry if 301 is detected and fallbacks to the inner retry strategy otherwise.
/// Other retries are processed with exponential backoff timeout
/// which is limited and rundomly spread
class RetryStrategy : public Aws::Client::RetryStrategy
{
public:
explicit RetryStrategy(std::shared_ptr<Aws::Client::RetryStrategy> wrapped_strategy_);
RetryStrategy(uint32_t maxRetries_ = 10, uint32_t scaleFactor_ = 25, uint32_t maxDelayMs_ = 90000);
/// NOLINTNEXTLINE(google-runtime-int)
bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const override;
@ -172,14 +172,10 @@ public:
/// NOLINTNEXTLINE(google-runtime-int)
long GetMaxAttempts() const override;
void GetSendToken() override;
bool HasSendToken() override;
void RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome) override;
void RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome, const Aws::Client::AWSError<Aws::Client::CoreErrors>& lastError) override;
private:
std::shared_ptr<Aws::Client::RetryStrategy> wrapped_strategy;
uint32_t maxRetries;
uint32_t scaleFactor;
uint32_t maxDelayMs;
};
/// SSE-KMS headers MUST be signed, so they need to be added before the SDK signs the message
@ -311,6 +307,7 @@ public:
const String & force_region,
const RemoteHostFilter & remote_host_filter,
unsigned int s3_max_redirects,
unsigned int s3_retry_attempts,
bool enable_s3_requests_logging,
bool for_disk_s3,
const ThrottlerPtr & get_request_throttler,

View File

@ -623,6 +623,7 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
configuration.region,
configuration.remote_host_filter,
configuration.s3_max_redirects,
configuration.s3_retry_attempts,
configuration.enable_s3_requests_logging,
configuration.for_disk_s3,
configuration.get_request_throttler,
@ -637,6 +638,7 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
configuration.region,
configuration.remote_host_filter,
configuration.s3_max_redirects,
configuration.s3_retry_attempts,
configuration.enable_s3_requests_logging,
configuration.for_disk_s3,
configuration.get_request_throttler,
@ -679,6 +681,7 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
configuration.region,
configuration.remote_host_filter,
configuration.s3_max_redirects,
configuration.s3_retry_attempts,
configuration.enable_s3_requests_logging,
configuration.for_disk_s3,
configuration.get_request_throttler,

View File

@ -89,6 +89,7 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
const String & force_region_,
const RemoteHostFilter & remote_host_filter_,
unsigned int s3_max_redirects_,
unsigned int s3_retry_attempts_,
bool enable_s3_requests_logging_,
bool for_disk_s3_,
const ThrottlerPtr & get_request_throttler_,
@ -98,6 +99,7 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
, force_region(force_region_)
, remote_host_filter(remote_host_filter_)
, s3_max_redirects(s3_max_redirects_)
, s3_retry_attempts(s3_retry_attempts_)
, enable_s3_requests_logging(enable_s3_requests_logging_)
, for_disk_s3(for_disk_s3_)
, get_request_throttler(get_request_throttler_)

View File

@ -41,6 +41,7 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
String force_region;
const RemoteHostFilter & remote_host_filter;
unsigned int s3_max_redirects;
unsigned int s3_retry_attempts;
bool enable_s3_requests_logging;
bool for_disk_s3;
ThrottlerPtr get_request_throttler;
@ -64,6 +65,7 @@ private:
const String & force_region_,
const RemoteHostFilter & remote_host_filter_,
unsigned int s3_max_redirects_,
unsigned int s3_retry_attempts,
bool enable_s3_requests_logging_,
bool for_disk_s3_,
const ThrottlerPtr & get_request_throttler_,

View File

@ -40,14 +40,6 @@
[[maybe_unused]] static Poco::Util::ServerApplication app;
class NoRetryStrategy : public Aws::Client::StandardRetryStrategy
{
bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors> &, long /* NOLINT */) const override { return false; }
public:
~NoRetryStrategy() override = default;
};
String getSSEAndSignedHeaders(const Poco::Net::MessageHeader & message_header)
{
String content;
@ -123,6 +115,7 @@ void testServerSideEncryption(
DB::RemoteHostFilter remote_host_filter;
unsigned int s3_max_redirects = 100;
unsigned int s3_retry_attempts = 0;
DB::S3::URI uri(http.getUrl() + "/IOTestAwsS3ClientAppendExtraHeaders/test.txt");
String access_key_id = "ACCESS_KEY_ID";
String secret_access_key = "SECRET_ACCESS_KEY";
@ -132,6 +125,7 @@ void testServerSideEncryption(
region,
remote_host_filter,
s3_max_redirects,
s3_retry_attempts,
enable_s3_requests_logging,
/* for_disk_s3 = */ false,
/* get_request_throttler = */ {},
@ -140,7 +134,6 @@ void testServerSideEncryption(
);
client_configuration.endpointOverride = uri.endpoint;
client_configuration.retryStrategy = std::make_shared<NoRetryStrategy>();
DB::HTTPHeaderEntries headers;
bool use_environment_credentials = false;

View File

@ -228,6 +228,7 @@ struct Client : DB::S3::Client
"some-region",
remote_host_filter,
/* s3_max_redirects = */ 100,
/* s3_retry_attempts = */ 0,
/* enable_s3_requests_logging = */ true,
/* for_disk_s3 = */ false,
/* get_request_throttler = */ {},

View File

@ -1204,6 +1204,7 @@ void StorageS3::Configuration::connect(ContextPtr context)
auth_settings.region,
context->getRemoteHostFilter(),
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_retry_attempts),
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
/* for_disk_s3 = */ false,
request_settings.get_request_throttler,
@ -1218,9 +1219,6 @@ void StorageS3::Configuration::connect(ContextPtr context)
client_configuration.requestTimeoutMs = request_settings.request_timeout_ms;
client_configuration.retryStrategy
= std::make_shared<Aws::Client::DefaultRetryStrategy>(request_settings.retry_attempts);
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key);
client = S3::ClientFactory::instance().create(
client_configuration,