draft with retry ConnectionResetException

This commit is contained in:
Sema Checherinda 2023-07-24 01:14:50 +03:00
parent cfed3589b0
commit c341df1949
3 changed files with 80 additions and 16 deletions

View File

@ -13,6 +13,8 @@
#include <aws/core/utils/HashingUtils.h>
#include <aws/core/utils/logging/ErrorMacros.h>
#include <Poco/Net/NetException.h>
#include <IO/S3Common.h>
#include <IO/S3/Requests.h>
#include <IO/S3/PocoHTTPClientFactory.h>
@ -23,6 +25,15 @@
#include <Common/logger_useful.h>
namespace ProfileEvents
{
extern const Event S3WriteRequestsErrors;
extern const Event S3ReadRequestsErrors;
extern const Event DiskS3WriteRequestsErrors;
extern const Event DiskS3ReadRequestsErrors;
}
namespace DB
{
@ -346,12 +357,12 @@ Model::HeadObjectOutcome Client::HeadObject(const HeadObjectRequest & request) c
Model::ListObjectsV2Outcome Client::ListObjectsV2(const ListObjectsV2Request & request) const
{
return doRequest(request, [this](const Model::ListObjectsV2Request & req) { return ListObjectsV2(req); });
return doRequestWithRetryNetworkErrors(request, [this](const Model::ListObjectsV2Request & req) { return ListObjectsV2(req); });
}
Model::ListObjectsOutcome Client::ListObjects(const ListObjectsRequest & request) const
{
return doRequest(request, [this](const Model::ListObjectsRequest & req) { return ListObjects(req); });
return doRequestWithRetryNetworkErrors(request, [this](const Model::ListObjectsRequest & req) { return ListObjects(req); });
}
Model::GetObjectOutcome Client::GetObject(const GetObjectRequest & request) const
@ -361,19 +372,19 @@ Model::GetObjectOutcome Client::GetObject(const GetObjectRequest & request) cons
Model::AbortMultipartUploadOutcome Client::AbortMultipartUpload(const AbortMultipartUploadRequest & request) const
{
return doRequest(
return doRequestWithRetryNetworkErrors(
request, [this](const Model::AbortMultipartUploadRequest & req) { return AbortMultipartUpload(req); });
}
Model::CreateMultipartUploadOutcome Client::CreateMultipartUpload(const CreateMultipartUploadRequest & request) const
{
return doRequest(
return doRequestWithRetryNetworkErrors(
request, [this](const Model::CreateMultipartUploadRequest & req) { return CreateMultipartUpload(req); });
}
Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(const CompleteMultipartUploadRequest & request) const
{
auto outcome = doRequest(
auto outcome = doRequestWithRetryNetworkErrors(
request, [this](const Model::CompleteMultipartUploadRequest & req) { return CompleteMultipartUpload(req); });
if (!outcome.IsSuccess() || provider_type != ProviderType::GCS)
@ -403,32 +414,32 @@ Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(const Comp
Model::CopyObjectOutcome Client::CopyObject(const CopyObjectRequest & request) const
{
return doRequest(request, [this](const Model::CopyObjectRequest & req) { return CopyObject(req); });
return doRequestWithRetryNetworkErrors(request, [this](const Model::CopyObjectRequest & req) { return CopyObject(req); });
}
Model::PutObjectOutcome Client::PutObject(const PutObjectRequest & request) const
{
return doRequest(request, [this](const Model::PutObjectRequest & req) { return PutObject(req); });
return doRequestWithRetryNetworkErrors(request, [this](const Model::PutObjectRequest & req) { return PutObject(req); });
}
Model::UploadPartOutcome Client::UploadPart(const UploadPartRequest & request) const
{
return doRequest(request, [this](const Model::UploadPartRequest & req) { return UploadPart(req); });
return doRequestWithRetryNetworkErrors(request, [this](const Model::UploadPartRequest & req) { return UploadPart(req); });
}
Model::UploadPartCopyOutcome Client::UploadPartCopy(const UploadPartCopyRequest & request) const
{
return doRequest(request, [this](const Model::UploadPartCopyRequest & req) { return UploadPartCopy(req); });
return doRequestWithRetryNetworkErrors(request, [this](const Model::UploadPartCopyRequest & req) { return UploadPartCopy(req); });
}
Model::DeleteObjectOutcome Client::DeleteObject(const DeleteObjectRequest & request) const
{
return doRequest(request, [this](const Model::DeleteObjectRequest & req) { return DeleteObject(req); });
return doRequestWithRetryNetworkErrors(request, [this](const Model::DeleteObjectRequest & req) { return DeleteObject(req); });
}
Model::DeleteObjectsOutcome Client::DeleteObjects(const DeleteObjectsRequest & request) const
{
return doRequest(request, [this](const Model::DeleteObjectsRequest & req) { return DeleteObjects(req); });
return doRequestWithRetryNetworkErrors(request, [this](const Model::DeleteObjectsRequest & req) { return DeleteObjects(req); });
}
Client::ComposeObjectOutcome Client::ComposeObject(const ComposeObjectRequest & request) const
@ -457,7 +468,7 @@ Client::ComposeObjectOutcome Client::ComposeObject(const ComposeObjectRequest &
return ComposeObjectOutcome(MakeRequest(req, endpointResolutionOutcome.GetResult(), Aws::Http::HttpMethod::HTTP_PUT));
};
return doRequest(request, request_fn);
return doRequestWithRetryNetworkErrors(request, request_fn);
}
template <typename RequestType, typename RequestFn>
@ -538,6 +549,57 @@ Client::doRequest(const RequestType & request, RequestFn request_fn) const
throw Exception(ErrorCodes::TOO_MANY_REDIRECTS, "Too many redirects");
}
template <typename RequestType, typename RequestFn>
std::invoke_result_t<RequestFn, RequestType>
Client::doRequestWithRetryNetworkErrors(const RequestType & request, RequestFn request_fn) const
{
auto with_retries = [this, request_fn_ = std::move(request_fn)] (const RequestType & request_)
{
const size_t max_tries = 10;
std::exception_ptr last_exception = nullptr;
for (size_t try_no = 0; try_no < max_tries; ++try_no)
{
try
{
return request_fn_(request_);
}
catch (Poco::Net::ConnectionResetException &)
{
// to do distinguish read/write
if (client_configuration.for_disk_s3)
{
ProfileEvents::increment(ProfileEvents::DiskS3WriteRequestsErrors);
ProfileEvents::increment(ProfileEvents::DiskS3ReadRequestsErrors);
}
else
{
ProfileEvents::increment(ProfileEvents::S3WriteRequestsErrors);
ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors);
}
tryLogCurrentException(log, "Will retry");
// to do back off
last_exception = std::current_exception();
continue;
}
}
chassert(last_exception);
std::rethrow_exception(last_exception);
// try
// {
// std::rethrow_exception(last_exception);
// }
// catch (const Poco::Exception & e)
// {
// throw Exception(Exception::CreateFromPocoTag{}, e);
// }
};
return doRequest(request, with_retries);
}
bool Client::supportsMultiPartCopy() const
{
return provider_type != ProviderType::GCS;

View File

@ -250,6 +250,10 @@ private:
std::invoke_result_t<RequestFn, RequestType>
doRequest(const RequestType & request, RequestFn request_fn) const;
template <typename RequestType, typename RequestFn>
std::invoke_result_t<RequestFn, RequestType>
doRequestWithRetryNetworkErrors(const RequestType & request, RequestFn request_fn) const;
void updateURIForBucket(const std::string & bucket, S3::URI new_uri) const;
std::optional<S3::URI> getURIFromError(const Aws::S3::S3Error & error) const;
std::optional<Aws::S3::S3Error> updateURIForBucketForHead(const std::string & bucket) const;

View File

@ -307,8 +307,7 @@ def test_when_s3_connection_reset_by_peer_at_upload_is_retried(
assert "Code: 1000" in error, error
assert (
"DB::Exception: Connection reset by peer." in error
or
"DB::Exception: Poco::Exception. Code: 1000, e.code() = 104, Connection reset by peer" in error
or "DB::Exception: Poco::Exception. Code: 1000, e.code() = 104, Connection reset by peer" in error
), error
@ -386,6 +385,5 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried(
assert "Code: 1000" in error, error
assert (
"DB::Exception: Connection reset by peer." in error
or
"DB::Exception: Poco::Exception. Code: 1000, e.code() = 104, Connection reset by peer" in error
or "DB::Exception: Poco::Exception. Code: 1000, e.code() = 104, Connection reset by peer" in error
), error