Fix lost errors in AWS CPP SDK

This commit is contained in:
alesapin 2022-08-23 00:34:39 +02:00
parent a3e4753d83
commit 780f39b108
6 changed files with 236 additions and 6 deletions

View File

@ -5,6 +5,8 @@
#include "PocoHTTPClient.h"
#include <utility>
#include <algorithm>
#include <functional>
#include <Common/logger_useful.h>
#include <Common/Stopwatch.h>
@ -14,6 +16,7 @@
#include <aws/core/http/HttpRequest.h>
#include <aws/core/http/HttpResponse.h>
#include <aws/core/utils/xml/XmlSerializer.h>
#include <aws/core/monitoring/HttpClientMetrics.h>
#include <aws/core/utils/ratelimiter/RateLimiterInterface.h>
#include "Poco/StreamCopier.h"
@ -121,6 +124,37 @@ std::shared_ptr<Aws::Http::HttpResponse> PocoHTTPClient::MakeRequest(
return response;
}
namespace
{
/// No comments:
/// 1) https://aws.amazon.com/premiumsupport/knowledge-center/s3-resolve-200-internalerror/
/// 2) https://github.com/aws/aws-sdk-cpp/issues/658
bool checkRequestCanReturn200AndErrorInBody(Aws::Http::HttpRequest & request)
{
auto query_params = request.GetQueryStringParameters();
if (request.HasHeader("z-amz-copy-source"))
{
/// CopyObject https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html
if (query_params.empty())
return true;
/// UploadPartCopy https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
if (query_params.contains("partNumber") && query_params.contains("uploadId"))
return true;
}
else
{
/// CompleteMultipartUpload https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html
if (query_params.size() == 1 && query_params.contains("uploadId"))
return true;
}
return false;
}
}
void PocoHTTPClient::makeRequestInternal(
Aws::Http::HttpRequest & request,
std::shared_ptr<PocoHTTPResponse> & response,
@ -281,6 +315,7 @@ void PocoHTTPClient::makeRequestInternal(
ProfileEvents::increment(select_metric(S3MetricType::Microseconds), watch.elapsedMicroseconds());
int status_code = static_cast<int>(poco_response.getStatus());
if (enable_s3_requests_logging)
LOG_TEST(log, "Response status: {}, {}", status_code, poco_response.getReason());
@ -316,19 +351,41 @@ void PocoHTTPClient::makeRequestInternal(
response->AddHeader(header_name, header_value);
}
if (status_code == 429 || status_code == 503)
{ // API throttling
ProfileEvents::increment(select_metric(S3MetricType::Throttling));
}
else if (status_code >= 300)
if (status_code == 200 && checkRequestCanReturn200AndErrorInBody(request))
{
std::string response_string((std::istreambuf_iterator<char>(response_body_stream)),
std::istreambuf_iterator<char>());
LOG_TRACE(log, "Got dangerous response with code 200, checking its body: '{}'", response_string.substr(0, 300));
const static std::string_view needle = "<Error>";
if (auto it = std::search(response_string.begin(), response_string.end(), std::default_searcher(needle.begin(), needle.end())); it != response_string.end())
{
LOG_WARNING(log, "Response for request contain <Error> tag in body, settings internal server error (500 code)");
response->SetResponseCode(Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR);
status_code = static_cast<int32_t>(Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR);
}
ProfileEvents::increment(select_metric(S3MetricType::Errors));
if (status_code >= 500 && error_report)
error_report(request_configuration);
}
else
{
response->SetResponseBody(response_body_stream, session);
if (status_code == 429 || status_code == 503)
{ // API throttling
ProfileEvents::increment(select_metric(S3MetricType::Throttling));
}
else if (status_code >= 300)
{
ProfileEvents::increment(select_metric(S3MetricType::Errors));
if (status_code >= 500 && error_report)
error_report(request_configuration);
}
response->SetResponseBody(response_body_stream, session);
}
return;
}
throw Exception(String("Too many redirects while trying to access ") + request.GetUri().GetURIString(),

View File

@ -0,0 +1 @@
#!/usr/bin/env python3

View File

@ -0,0 +1,35 @@
<clickhouse>
<logger>
<level>test</level>
</logger>
<storage_configuration>
<disks>
<s3>
<type>s3</type>
<!-- Use custom S3 endpoint -->
<endpoint>http://resolver:8080/root/data/</endpoint>
<access_key_id>minio</access_key_id>
<secret_access_key>minio123</secret_access_key>
<!-- ClickHouse starts earlier than custom S3 endpoint. Skip access check to avoid fail on start-up -->
<skip_access_check>true</skip_access_check>
<!-- Avoid extra retries to speed up tests -->
<retry_attempts>0</retry_attempts>
<skip_access_check>true</skip_access_check>
</s3>
</disks>
<policies>
<s3>
<volumes>
<main>
<disk>s3</disk>
</main>
</volumes>
</s3>
</policies>
</storage_configuration>
<merge_tree>
<storage_policy>s3</storage_policy>
</merge_tree>
</clickhouse>

View File

@ -0,0 +1,22 @@
<clickhouse>
<profiles>
<default>
<s3_max_single_part_upload_size>1</s3_max_single_part_upload_size>
<enable_s3_requests_logging>1</enable_s3_requests_logging>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas><default></default></quotas>
</clickhouse>

View File

@ -0,0 +1,31 @@
#!/usr/bin/env python3
from bottle import request, route, run, response
# Handle for MultipleObjectsDelete.
@route("/<_bucket>", ["POST"])
def delete(_bucket):
response.set_header(
"Location", "http://minio1:9001/" + _bucket + "?" + request.query_string
)
response.status = 307
return "Redirected"
@route("/<_bucket>/<_path:path>", ["GET", "POST", "PUT", "DELETE"])
def server(_bucket, _path):
if request.query_string.startswith("uploadId="):
response.status = 200
response.content_type = "text/xml"
return '<?xml version="1.0" encoding="UTF-8"?><Error><Code>InternalError</Code><Message>We encountered an internal error. Please try again.</Message><RequestId>txfbd566d03042474888193-00608d7538</RequestId></Error>'
response.set_header("Location", "http://minio1:9001/" + _bucket + "/" + _path + "?" + request.query_string)
response.status = 307
return "Redirected"
@route("/")
def ping():
return "OK"
run(host="0.0.0.0", port=8080)

View File

@ -0,0 +1,84 @@
#!/usr/bin/env python3
import logging
import os
import time
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
# Runs custom python-based S3 endpoint.
def run_endpoint(cluster):
logging.info("Starting custom S3 endpoint")
container_id = cluster.get_container_id("resolver")
current_dir = os.path.dirname(__file__)
cluster.copy_file_to_container(
container_id,
os.path.join(current_dir, "s3_endpoint", "endpoint.py"),
"endpoint.py",
)
cluster.exec_in_container(container_id, ["python", "endpoint.py"], detach=True)
# Wait for S3 endpoint start
num_attempts = 100
for attempt in range(num_attempts):
ping_response = cluster.exec_in_container(
cluster.get_container_id("resolver"),
["curl", "-s", "http://resolver:8080/"],
nothrow=True,
)
if ping_response != "OK":
if attempt == num_attempts - 1:
assert ping_response == "OK", 'Expected "OK", but got "{}"'.format(
ping_response
)
else:
time.sleep(1)
else:
break
logging.info("S3 endpoint started")
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=[
"configs/storage_conf.xml",
],
user_configs=[
"configs/upload_min_size.xml",
],
with_minio=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
run_endpoint(cluster)
yield cluster
finally:
cluster.shutdown()
def test_dataloss(cluster):
node = cluster.instances["node"]
node.query(
"""
CREATE TABLE s3_failover_test (
id Int64,
data String
) ENGINE=MergeTree()
ORDER BY id
"""
)
with pytest.raises(Exception) as err:
node.query("INSERT INTO s3_failover_test VALUES (1, 'Hello')")