mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #54651 from CheSema/limit_backoff_timeout
limit the delay before next try in S3
This commit is contained in:
commit
e7550523c8
@ -49,6 +49,7 @@ namespace
|
||||
settings.auth_settings.region,
|
||||
context->getRemoteHostFilter(),
|
||||
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
|
||||
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_retry_attempts),
|
||||
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false, settings.request_settings.get_request_throttler, settings.request_settings.put_request_throttler,
|
||||
s3_uri.uri.getScheme());
|
||||
|
@ -80,6 +80,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
|
||||
auto headers = auth_settings.headers;
|
||||
|
||||
static constexpr size_t s3_max_redirects = 10;
|
||||
static constexpr size_t s3_retry_attempts = 10;
|
||||
static constexpr bool enable_s3_requests_logging = false;
|
||||
|
||||
if (!new_uri.key.empty())
|
||||
@ -90,7 +91,7 @@ void KeeperSnapshotManagerS3::updateS3Configuration(const Poco::Util::AbstractCo
|
||||
|
||||
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
|
||||
auth_settings.region,
|
||||
RemoteHostFilter(), s3_max_redirects,
|
||||
RemoteHostFilter(), s3_max_redirects, s3_retry_attempts,
|
||||
enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false, /* get_request_throttler = */ {}, /* put_request_throttler = */ {},
|
||||
new_uri.uri.getScheme());
|
||||
|
@ -52,6 +52,7 @@ std::unique_ptr<S3::Client> getClient(
|
||||
config.getString(config_prefix + ".region", ""),
|
||||
context->getRemoteHostFilter(),
|
||||
static_cast<int>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
|
||||
static_cast<int>(context->getGlobalContext()->getSettingsRef().s3_retry_attempts),
|
||||
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ true,
|
||||
settings.request_settings.get_request_throttler,
|
||||
|
@ -49,11 +49,12 @@ namespace ErrorCodes
|
||||
namespace S3
|
||||
{
|
||||
|
||||
Client::RetryStrategy::RetryStrategy(std::shared_ptr<Aws::Client::RetryStrategy> wrapped_strategy_)
|
||||
: wrapped_strategy(std::move(wrapped_strategy_))
|
||||
Client::RetryStrategy::RetryStrategy(uint32_t maxRetries_, uint32_t scaleFactor_, uint32_t maxDelayMs_)
|
||||
: maxRetries(maxRetries_)
|
||||
, scaleFactor(scaleFactor_)
|
||||
, maxDelayMs(maxDelayMs_)
|
||||
{
|
||||
if (!wrapped_strategy)
|
||||
wrapped_strategy = Aws::Client::InitRetryStrategy();
|
||||
chassert(maxDelayMs <= uint64_t(scaleFactor) * (1ul << 31l));
|
||||
}
|
||||
|
||||
/// NOLINTNEXTLINE(google-runtime-int)
|
||||
@ -62,39 +63,28 @@ bool Client::RetryStrategy::ShouldRetry(const Aws::Client::AWSError<Aws::Client:
|
||||
if (error.GetResponseCode() == Aws::Http::HttpResponseCode::MOVED_PERMANENTLY)
|
||||
return false;
|
||||
|
||||
return wrapped_strategy->ShouldRetry(error, attemptedRetries);
|
||||
if (attemptedRetries >= maxRetries)
|
||||
return false;
|
||||
|
||||
return error.ShouldRetry();
|
||||
}
|
||||
|
||||
/// NOLINTNEXTLINE(google-runtime-int)
|
||||
long Client::RetryStrategy::CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const
|
||||
long Client::RetryStrategy::CalculateDelayBeforeNextRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>&, long attemptedRetries) const
|
||||
{
|
||||
return wrapped_strategy->CalculateDelayBeforeNextRetry(error, attemptedRetries);
|
||||
if (attemptedRetries == 0)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint64_t backoffLimitedPow = 1ul << std::min(attemptedRetries, 31l);
|
||||
return std::min<uint64_t>(scaleFactor * backoffLimitedPow, maxDelayMs);
|
||||
}
|
||||
|
||||
/// NOLINTNEXTLINE(google-runtime-int)
|
||||
long Client::RetryStrategy::GetMaxAttempts() const
|
||||
{
|
||||
return wrapped_strategy->GetMaxAttempts();
|
||||
}
|
||||
|
||||
void Client::RetryStrategy::GetSendToken()
|
||||
{
|
||||
return wrapped_strategy->GetSendToken();
|
||||
}
|
||||
|
||||
bool Client::RetryStrategy::HasSendToken()
|
||||
{
|
||||
return wrapped_strategy->HasSendToken();
|
||||
}
|
||||
|
||||
void Client::RetryStrategy::RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome)
|
||||
{
|
||||
return wrapped_strategy->RequestBookkeeping(httpResponseOutcome);
|
||||
}
|
||||
|
||||
void Client::RetryStrategy::RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome, const Aws::Client::AWSError<Aws::Client::CoreErrors>& lastError)
|
||||
{
|
||||
return wrapped_strategy->RequestBookkeeping(httpResponseOutcome, lastError);
|
||||
return maxRetries + 1;
|
||||
}
|
||||
|
||||
namespace
|
||||
@ -569,6 +559,7 @@ Client::doRequestWithRetryNetworkErrors(const RequestType & request, RequestFn r
|
||||
{
|
||||
chassert(client_configuration.retryStrategy);
|
||||
const Int64 max_attempts = client_configuration.retryStrategy->GetMaxAttempts();
|
||||
chassert(max_attempts > 0);
|
||||
std::exception_ptr last_exception = nullptr;
|
||||
for (Int64 attempt_no = 0; attempt_no < max_attempts; ++attempt_no)
|
||||
{
|
||||
@ -846,7 +837,8 @@ std::unique_ptr<S3::Client> ClientFactory::create( // NOLINT
|
||||
std::move(credentials),
|
||||
credentials_configuration);
|
||||
|
||||
client_configuration.retryStrategy = std::make_shared<Client::RetryStrategy>(std::move(client_configuration.retryStrategy));
|
||||
client_configuration.retryStrategy = std::make_shared<Client::RetryStrategy>(client_configuration.s3_retry_attempts);
|
||||
|
||||
return Client::create(
|
||||
client_configuration.s3_max_redirects,
|
||||
std::move(sse_kms_config),
|
||||
@ -861,6 +853,7 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT
|
||||
const String & force_region,
|
||||
const RemoteHostFilter & remote_host_filter,
|
||||
unsigned int s3_max_redirects,
|
||||
unsigned int s3_retry_attempts,
|
||||
bool enable_s3_requests_logging,
|
||||
bool for_disk_s3,
|
||||
const ThrottlerPtr & get_request_throttler,
|
||||
@ -879,6 +872,7 @@ PocoHTTPClientConfiguration ClientFactory::createClientConfiguration( // NOLINT
|
||||
force_region,
|
||||
remote_host_filter,
|
||||
s3_max_redirects,
|
||||
s3_retry_attempts,
|
||||
enable_s3_requests_logging,
|
||||
for_disk_s3,
|
||||
get_request_throttler,
|
||||
|
@ -152,16 +152,16 @@ public:
|
||||
|
||||
Aws::Auth::AWSCredentials getCredentials() const;
|
||||
|
||||
/// Decorator for RetryStrategy needed for this client to work correctly.
|
||||
/// We want to manually handle permanent moves (status code 301) because:
|
||||
/// - redirect location is written in XML format inside the response body something that doesn't exist for HEAD
|
||||
/// requests so we need to manually find the correct location
|
||||
/// - we want to cache the new location to decrease number of roundtrips for future requests
|
||||
/// This decorator doesn't retry if 301 is detected and fallbacks to the inner retry strategy otherwise.
|
||||
/// Other retries are processed with exponential backoff timeout
|
||||
/// which is limited and rundomly spread
|
||||
class RetryStrategy : public Aws::Client::RetryStrategy
|
||||
{
|
||||
public:
|
||||
explicit RetryStrategy(std::shared_ptr<Aws::Client::RetryStrategy> wrapped_strategy_);
|
||||
RetryStrategy(uint32_t maxRetries_ = 10, uint32_t scaleFactor_ = 25, uint32_t maxDelayMs_ = 90000);
|
||||
|
||||
/// NOLINTNEXTLINE(google-runtime-int)
|
||||
bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors>& error, long attemptedRetries) const override;
|
||||
@ -172,14 +172,10 @@ public:
|
||||
/// NOLINTNEXTLINE(google-runtime-int)
|
||||
long GetMaxAttempts() const override;
|
||||
|
||||
void GetSendToken() override;
|
||||
|
||||
bool HasSendToken() override;
|
||||
|
||||
void RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome) override;
|
||||
void RequestBookkeeping(const Aws::Client::HttpResponseOutcome& httpResponseOutcome, const Aws::Client::AWSError<Aws::Client::CoreErrors>& lastError) override;
|
||||
private:
|
||||
std::shared_ptr<Aws::Client::RetryStrategy> wrapped_strategy;
|
||||
uint32_t maxRetries;
|
||||
uint32_t scaleFactor;
|
||||
uint32_t maxDelayMs;
|
||||
};
|
||||
|
||||
/// SSE-KMS headers MUST be signed, so they need to be added before the SDK signs the message
|
||||
@ -311,6 +307,7 @@ public:
|
||||
const String & force_region,
|
||||
const RemoteHostFilter & remote_host_filter,
|
||||
unsigned int s3_max_redirects,
|
||||
unsigned int s3_retry_attempts,
|
||||
bool enable_s3_requests_logging,
|
||||
bool for_disk_s3,
|
||||
const ThrottlerPtr & get_request_throttler,
|
||||
|
@ -623,6 +623,7 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
|
||||
configuration.region,
|
||||
configuration.remote_host_filter,
|
||||
configuration.s3_max_redirects,
|
||||
configuration.s3_retry_attempts,
|
||||
configuration.enable_s3_requests_logging,
|
||||
configuration.for_disk_s3,
|
||||
configuration.get_request_throttler,
|
||||
@ -637,6 +638,7 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
|
||||
configuration.region,
|
||||
configuration.remote_host_filter,
|
||||
configuration.s3_max_redirects,
|
||||
configuration.s3_retry_attempts,
|
||||
configuration.enable_s3_requests_logging,
|
||||
configuration.for_disk_s3,
|
||||
configuration.get_request_throttler,
|
||||
@ -679,6 +681,7 @@ S3CredentialsProviderChain::S3CredentialsProviderChain(
|
||||
configuration.region,
|
||||
configuration.remote_host_filter,
|
||||
configuration.s3_max_redirects,
|
||||
configuration.s3_retry_attempts,
|
||||
configuration.enable_s3_requests_logging,
|
||||
configuration.for_disk_s3,
|
||||
configuration.get_request_throttler,
|
||||
|
@ -96,6 +96,7 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
|
||||
const String & force_region_,
|
||||
const RemoteHostFilter & remote_host_filter_,
|
||||
unsigned int s3_max_redirects_,
|
||||
unsigned int s3_retry_attempts_,
|
||||
bool enable_s3_requests_logging_,
|
||||
bool for_disk_s3_,
|
||||
const ThrottlerPtr & get_request_throttler_,
|
||||
@ -105,6 +106,7 @@ PocoHTTPClientConfiguration::PocoHTTPClientConfiguration(
|
||||
, force_region(force_region_)
|
||||
, remote_host_filter(remote_host_filter_)
|
||||
, s3_max_redirects(s3_max_redirects_)
|
||||
, s3_retry_attempts(s3_retry_attempts_)
|
||||
, enable_s3_requests_logging(enable_s3_requests_logging_)
|
||||
, for_disk_s3(for_disk_s3_)
|
||||
, get_request_throttler(get_request_throttler_)
|
||||
|
@ -41,6 +41,7 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
|
||||
String force_region;
|
||||
const RemoteHostFilter & remote_host_filter;
|
||||
unsigned int s3_max_redirects;
|
||||
unsigned int s3_retry_attempts;
|
||||
bool enable_s3_requests_logging;
|
||||
bool for_disk_s3;
|
||||
ThrottlerPtr get_request_throttler;
|
||||
@ -64,6 +65,7 @@ private:
|
||||
const String & force_region_,
|
||||
const RemoteHostFilter & remote_host_filter_,
|
||||
unsigned int s3_max_redirects_,
|
||||
unsigned int s3_retry_attempts,
|
||||
bool enable_s3_requests_logging_,
|
||||
bool for_disk_s3_,
|
||||
const ThrottlerPtr & get_request_throttler_,
|
||||
|
@ -40,14 +40,6 @@
|
||||
[[maybe_unused]] static Poco::Util::ServerApplication app;
|
||||
|
||||
|
||||
class NoRetryStrategy : public Aws::Client::StandardRetryStrategy
|
||||
{
|
||||
bool ShouldRetry(const Aws::Client::AWSError<Aws::Client::CoreErrors> &, long /* NOLINT */) const override { return false; }
|
||||
|
||||
public:
|
||||
~NoRetryStrategy() override = default;
|
||||
};
|
||||
|
||||
String getSSEAndSignedHeaders(const Poco::Net::MessageHeader & message_header)
|
||||
{
|
||||
String content;
|
||||
@ -123,6 +115,7 @@ void testServerSideEncryption(
|
||||
|
||||
DB::RemoteHostFilter remote_host_filter;
|
||||
unsigned int s3_max_redirects = 100;
|
||||
unsigned int s3_retry_attempts = 0;
|
||||
DB::S3::URI uri(http.getUrl() + "/IOTestAwsS3ClientAppendExtraHeaders/test.txt");
|
||||
String access_key_id = "ACCESS_KEY_ID";
|
||||
String secret_access_key = "SECRET_ACCESS_KEY";
|
||||
@ -132,6 +125,7 @@ void testServerSideEncryption(
|
||||
region,
|
||||
remote_host_filter,
|
||||
s3_max_redirects,
|
||||
s3_retry_attempts,
|
||||
enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false,
|
||||
/* get_request_throttler = */ {},
|
||||
@ -140,7 +134,6 @@ void testServerSideEncryption(
|
||||
);
|
||||
|
||||
client_configuration.endpointOverride = uri.endpoint;
|
||||
client_configuration.retryStrategy = std::make_shared<NoRetryStrategy>();
|
||||
|
||||
DB::HTTPHeaderEntries headers;
|
||||
bool use_environment_credentials = false;
|
||||
|
@ -228,6 +228,7 @@ struct Client : DB::S3::Client
|
||||
"some-region",
|
||||
remote_host_filter,
|
||||
/* s3_max_redirects = */ 100,
|
||||
/* s3_retry_attempts = */ 0,
|
||||
/* enable_s3_requests_logging = */ true,
|
||||
/* for_disk_s3 = */ false,
|
||||
/* get_request_throttler = */ {},
|
||||
|
@ -1212,6 +1212,7 @@ void StorageS3::Configuration::connect(ContextPtr context)
|
||||
auth_settings.region,
|
||||
context->getRemoteHostFilter(),
|
||||
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_max_redirects),
|
||||
static_cast<unsigned>(context->getGlobalContext()->getSettingsRef().s3_retry_attempts),
|
||||
context->getGlobalContext()->getSettingsRef().enable_s3_requests_logging,
|
||||
/* for_disk_s3 = */ false,
|
||||
request_settings.get_request_throttler,
|
||||
@ -1226,9 +1227,6 @@ void StorageS3::Configuration::connect(ContextPtr context)
|
||||
|
||||
client_configuration.requestTimeoutMs = request_settings.request_timeout_ms;
|
||||
|
||||
client_configuration.retryStrategy
|
||||
= std::make_shared<Aws::Client::DefaultRetryStrategy>(request_settings.retry_attempts);
|
||||
|
||||
auto credentials = Aws::Auth::AWSCredentials(auth_settings.access_key_id, auth_settings.secret_access_key);
|
||||
client = S3::ClientFactory::instance().create(
|
||||
client_configuration,
|
||||
|
Loading…
Reference in New Issue
Block a user