diff --git a/src/IO/S3/Client.cpp b/src/IO/S3/Client.cpp index 94a7b5166da..51c7ee32579 100644 --- a/src/IO/S3/Client.cpp +++ b/src/IO/S3/Client.cpp @@ -357,12 +357,14 @@ Model::HeadObjectOutcome Client::HeadObject(const HeadObjectRequest & request) c Model::ListObjectsV2Outcome Client::ListObjectsV2(const ListObjectsV2Request & request) const { - return doRequestWithRetryNetworkErrors(request, [this](const Model::ListObjectsV2Request & req) { return ListObjectsV2(req); }); + return doRequestWithRetryNetworkErrors( + request, [this](const Model::ListObjectsV2Request & req) { return ListObjectsV2(req); }); } Model::ListObjectsOutcome Client::ListObjects(const ListObjectsRequest & request) const { - return doRequestWithRetryNetworkErrors(request, [this](const Model::ListObjectsRequest & req) { return ListObjects(req); }); + return doRequestWithRetryNetworkErrors( + request, [this](const Model::ListObjectsRequest & req) { return ListObjects(req); }); } Model::GetObjectOutcome Client::GetObject(const GetObjectRequest & request) const @@ -372,19 +374,19 @@ Model::GetObjectOutcome Client::GetObject(const GetObjectRequest & request) cons Model::AbortMultipartUploadOutcome Client::AbortMultipartUpload(const AbortMultipartUploadRequest & request) const { - return doRequestWithRetryNetworkErrors( + return doRequestWithRetryNetworkErrors( request, [this](const Model::AbortMultipartUploadRequest & req) { return AbortMultipartUpload(req); }); } Model::CreateMultipartUploadOutcome Client::CreateMultipartUpload(const CreateMultipartUploadRequest & request) const { - return doRequestWithRetryNetworkErrors( + return doRequestWithRetryNetworkErrors( request, [this](const Model::CreateMultipartUploadRequest & req) { return CreateMultipartUpload(req); }); } Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(const CompleteMultipartUploadRequest & request) const { - auto outcome = doRequestWithRetryNetworkErrors( + auto outcome = doRequestWithRetryNetworkErrors( request, [this](const Model::CompleteMultipartUploadRequest & req) { return CompleteMultipartUpload(req); }); if (!outcome.IsSuccess() || provider_type != ProviderType::GCS) @@ -414,32 +416,38 @@ Model::CompleteMultipartUploadOutcome Client::CompleteMultipartUpload(const Comp Model::CopyObjectOutcome Client::CopyObject(const CopyObjectRequest & request) const { - return doRequestWithRetryNetworkErrors(request, [this](const Model::CopyObjectRequest & req) { return CopyObject(req); }); + return doRequestWithRetryNetworkErrors( + request, [this](const Model::CopyObjectRequest & req) { return CopyObject(req); }); } Model::PutObjectOutcome Client::PutObject(const PutObjectRequest & request) const { - return doRequestWithRetryNetworkErrors(request, [this](const Model::PutObjectRequest & req) { return PutObject(req); }); + return doRequestWithRetryNetworkErrors( + request, [this](const Model::PutObjectRequest & req) { return PutObject(req); }); } Model::UploadPartOutcome Client::UploadPart(const UploadPartRequest & request) const { - return doRequestWithRetryNetworkErrors(request, [this](const Model::UploadPartRequest & req) { return UploadPart(req); }); + return doRequestWithRetryNetworkErrors( + request, [this](const Model::UploadPartRequest & req) { return UploadPart(req); }); } Model::UploadPartCopyOutcome Client::UploadPartCopy(const UploadPartCopyRequest & request) const { - return doRequestWithRetryNetworkErrors(request, [this](const Model::UploadPartCopyRequest & req) { return UploadPartCopy(req); }); + return doRequestWithRetryNetworkErrors( + request, [this](const Model::UploadPartCopyRequest & req) { return UploadPartCopy(req); }); } Model::DeleteObjectOutcome Client::DeleteObject(const DeleteObjectRequest & request) const { - return doRequestWithRetryNetworkErrors(request, [this](const Model::DeleteObjectRequest & req) { return DeleteObject(req); }); + return doRequestWithRetryNetworkErrors( + request, [this](const Model::DeleteObjectRequest & req) { return DeleteObject(req); }); } Model::DeleteObjectsOutcome Client::DeleteObjects(const DeleteObjectsRequest & request) const { - return doRequestWithRetryNetworkErrors(request, [this](const Model::DeleteObjectsRequest & req) { return DeleteObjects(req); }); + return doRequestWithRetryNetworkErrors( + request, [this](const Model::DeleteObjectsRequest & req) { return DeleteObjects(req); }); } Client::ComposeObjectOutcome Client::ComposeObject(const ComposeObjectRequest & request) const @@ -468,7 +476,8 @@ Client::ComposeObjectOutcome Client::ComposeObject(const ComposeObjectRequest & return ComposeObjectOutcome(MakeRequest(req, endpointResolutionOutcome.GetResult(), Aws::Http::HttpMethod::HTTP_PUT)); }; - return doRequestWithRetryNetworkErrors(request, request_fn); + return doRequestWithRetryNetworkErrors( + request, request_fn); } template @@ -549,52 +558,60 @@ Client::doRequest(const RequestType & request, RequestFn request_fn) const throw Exception(ErrorCodes::TOO_MANY_REDIRECTS, "Too many redirects"); } -template +template std::invoke_result_t Client::doRequestWithRetryNetworkErrors(const RequestType & request, RequestFn request_fn) const { auto with_retries = [this, request_fn_ = std::move(request_fn)] (const RequestType & request_) { - const size_t max_tries = 10; + chassert(client_configuration.retryStrategy); + const Int64 max_attempts = client_configuration.retryStrategy->GetMaxAttempts(); std::exception_ptr last_exception = nullptr; - for (size_t try_no = 0; try_no < max_tries; ++try_no) + for (Int64 attempt_no = 0; attempt_no < max_attempts; ++attempt_no) { try { + /// S3 does retries network errors actually. + /// But it is matter when errors occur. + /// This code retries a specific case when + /// network error happens when XML document is being read from the response body. + /// Hence, the response body is a stream, network errors are possible at reading. + /// S3 doesn't retry them. + + /// Not all requests can be retried in that way. + /// Requests that read out response body to build the result are possible to retry. + /// Requests that expose the response stream as an answer are not retried with that code. E.g. GetObject. return request_fn_(request_); } catch (Poco::Net::ConnectionResetException &) { - // to do distinguish read/write - if (client_configuration.for_disk_s3) + + if constexpr (IsReadMethod) { - ProfileEvents::increment(ProfileEvents::DiskS3WriteRequestsErrors); - ProfileEvents::increment(ProfileEvents::DiskS3ReadRequestsErrors); + if (client_configuration.for_disk_s3) + ProfileEvents::increment(ProfileEvents::DiskS3ReadRequestsErrors); + else + ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors); } else { - ProfileEvents::increment(ProfileEvents::S3WriteRequestsErrors); - ProfileEvents::increment(ProfileEvents::S3ReadRequestsErrors); + if (client_configuration.for_disk_s3) + ProfileEvents::increment(ProfileEvents::DiskS3WriteRequestsErrors); + else + ProfileEvents::increment(ProfileEvents::S3WriteRequestsErrors); } tryLogCurrentException(log, "Will retry"); - // to do back off last_exception = std::current_exception(); + + auto error = Aws::Client::AWSError(Aws::Client::CoreErrors::NETWORK_CONNECTION, /*retry*/ true); + client_configuration.retryStrategy->CalculateDelayBeforeNextRetry(error, attempt_no); continue; } } chassert(last_exception); std::rethrow_exception(last_exception); - -// try -// { -// std::rethrow_exception(last_exception); -// } -// catch (const Poco::Exception & e) -// { -// throw Exception(Exception::CreateFromPocoTag{}, e); -// } }; return doRequest(request, with_retries); diff --git a/src/IO/S3/Client.h b/src/IO/S3/Client.h index bb893c21774..1b0fdcefe32 100644 --- a/src/IO/S3/Client.h +++ b/src/IO/S3/Client.h @@ -250,7 +250,7 @@ private: std::invoke_result_t doRequest(const RequestType & request, RequestFn request_fn) const; - template + template std::invoke_result_t doRequestWithRetryNetworkErrors(const RequestType & request, RequestFn request_fn) const; diff --git a/tests/integration/test_checking_s3_blobs_paranoid/test.py b/tests/integration/test_checking_s3_blobs_paranoid/test.py index 6aa149b9c7b..28b0c9beeaa 100644 --- a/tests/integration/test_checking_s3_blobs_paranoid/test.py +++ b/tests/integration/test_checking_s3_blobs_paranoid/test.py @@ -248,7 +248,9 @@ def test_when_s3_connection_reset_by_peer_at_upload_is_retried( action_args=["1"] if send_something else ["0"], ) - insert_query_id = f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_UPLOAD" + insert_query_id = ( + f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_UPLOAD_{send_something}" + ) node.query( f""" INSERT INTO @@ -283,7 +285,9 @@ def test_when_s3_connection_reset_by_peer_at_upload_is_retried( action="connection_reset_by_peer", action_args=["1"] if send_something else ["0"], ) - insert_query_id = f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_UPLOAD_1" + insert_query_id = ( + f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_UPLOAD_{send_something}_1" + ) error = node.query_and_get_error( f""" INSERT INTO @@ -307,7 +311,8 @@ def test_when_s3_connection_reset_by_peer_at_upload_is_retried( assert "Code: 1000" in error, error assert ( "DB::Exception: Connection reset by peer." in error - or "DB::Exception: Poco::Exception. Code: 1000, e.code() = 104, Connection reset by peer" in error + or "DB::Exception: Poco::Exception. Code: 1000, e.code() = 104, Connection reset by peer" + in error ), error @@ -325,7 +330,9 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried( action_args=["1"] if send_something else ["0"], ) - insert_query_id = f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_MULTIPARTUPLOAD" + insert_query_id = ( + f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_MULTIPARTUPLOAD_{send_something}" + ) node.query( f""" INSERT INTO @@ -361,7 +368,9 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried( action_args=["1"] if send_something else ["0"], ) - insert_query_id = f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_MULTIPARTUPLOAD_1" + insert_query_id = ( + f"TEST_WHEN_S3_CONNECTION_RESET_BY_PEER_AT_MULTIPARTUPLOAD_{send_something}_1" + ) error = node.query_and_get_error( f""" INSERT INTO @@ -385,5 +394,6 @@ def test_when_s3_connection_reset_by_peer_at_create_mpu_retried( assert "Code: 1000" in error, error assert ( "DB::Exception: Connection reset by peer." in error - or "DB::Exception: Poco::Exception. Code: 1000, e.code() = 104, Connection reset by peer" in error + or "DB::Exception: Poco::Exception. Code: 1000, e.code() = 104, Connection reset by peer" + in error ), error