Merge pull request #11230 from Jokser/s3-poco-http-client

S3 Poco Http Client
This commit is contained in:
alexey-milovidov 2020-06-01 01:20:21 +03:00 committed by GitHub
commit 256f84ddc2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 275 additions and 6 deletions

View File

@ -74,6 +74,7 @@ if(USE_RDKAFKA)
endif()
if (USE_AWS_S3)
add_headers_and_sources(dbms Common/S3)
add_headers_and_sources(dbms Disks/S3)
endif()

View File

@ -68,7 +68,7 @@ namespace
throw Exception("Unsupported scheme in URI '" + uri.toString() + "'", ErrorCodes::UNSUPPORTED_URI_SCHEME);
}
HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive)
HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive, bool resolve_host=true)
{
HTTPSessionPtr session;
@ -83,7 +83,10 @@ namespace
ProfileEvents::increment(ProfileEvents::CreatedHTTPConnections);
if (resolve_host)
session->setHost(DNSResolver::instance().resolveHost(host).toString());
else
session->setHost(host);
session->setPort(port);
/// doesn't work properly without patch
@ -202,13 +205,13 @@ void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigne
response.set("Keep-Alive", "timeout=" + std::to_string(timeout.totalSeconds()));
}
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts)
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host)
{
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = isHTTPS(uri);
auto session = makeHTTPSessionImpl(host, port, https, false);
auto session = makeHTTPSessionImpl(host, port, https, false, resolve_host);
setTimeouts(*session, timeouts);
return session;
}

View File

@ -45,7 +45,7 @@ using HTTPSessionPtr = std::shared_ptr<Poco::Net::HTTPClientSession>;
void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout);
/// Create session object to perform requests and set required parameters.
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts);
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host = true);
/// As previous method creates session, but tooks it from pool
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size);

View File

@ -0,0 +1,163 @@
#include "PocoHttpClient.h"
#include <utility>
#include <IO/HTTPCommon.h>
#include <aws/core/http/HttpRequest.h>
#include <aws/core/http/HttpResponse.h>
#include <aws/core/http/standard/StandardHttpResponse.h>
#include <aws/core/monitoring/HttpClientMetrics.h>
#include <aws/core/utils/ratelimiter/RateLimiterInterface.h>
#include "Poco/StreamCopier.h"
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <common/logger_useful.h>
namespace DB::S3
{
PocoHttpClient::PocoHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration)
: per_request_configuration(clientConfiguration.perRequestConfiguration)
, timeouts(ConnectionTimeouts(
Poco::Timespan(clientConfiguration.connectTimeoutMs * 1000), /// connection timeout.
Poco::Timespan(clientConfiguration.httpRequestTimeoutMs * 1000), /// send timeout.
Poco::Timespan(clientConfiguration.httpRequestTimeoutMs * 1000) /// receive timeout.
))
{
}
std::shared_ptr<Aws::Http::HttpResponse> PocoHttpClient::MakeRequest(
Aws::Http::HttpRequest & request,
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const
{
auto response = Aws::MakeShared<Aws::Http::Standard::StandardHttpResponse>("PocoHttpClient", request);
MakeRequestInternal(request, response, readLimiter, writeLimiter);
return response;
}
std::shared_ptr<Aws::Http::HttpResponse> PocoHttpClient::MakeRequest(
const std::shared_ptr<Aws::Http::HttpRequest> & request,
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const
{
auto response = Aws::MakeShared<Aws::Http::Standard::StandardHttpResponse>("PocoHttpClient", request);
MakeRequestInternal(*request, response, readLimiter, writeLimiter);
return response;
}
void PocoHttpClient::MakeRequestInternal(
Aws::Http::HttpRequest & request,
std::shared_ptr<Aws::Http::Standard::StandardHttpResponse> & response,
Aws::Utils::RateLimits::RateLimiterInterface *,
Aws::Utils::RateLimits::RateLimiterInterface *) const
{
auto uri = request.GetUri().GetURIString();
LOG_DEBUG(&Logger::get("AWSClient"), "Make request to: {}", uri);
const int MAX_REDIRECT_ATTEMPTS = 10;
try
{
for (int attempt = 0; attempt < MAX_REDIRECT_ATTEMPTS; ++attempt)
{
Poco::URI poco_uri(uri);
/// Reverse proxy can replace host header with resolved ip address instead of host name.
/// This can lead to request signature difference on S3 side.
auto session = makeHTTPSession(poco_uri, timeouts, false);
auto request_configuration = per_request_configuration(request);
if (!request_configuration.proxyHost.empty())
session->setProxy(request_configuration.proxyHost, request_configuration.proxyPort);
Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1);
poco_request.setURI(poco_uri.getPathAndQuery());
switch (request.GetMethod())
{
case Aws::Http::HttpMethod::HTTP_GET:
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_GET);
break;
case Aws::Http::HttpMethod::HTTP_POST:
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_POST);
break;
case Aws::Http::HttpMethod::HTTP_DELETE:
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_DELETE);
break;
case Aws::Http::HttpMethod::HTTP_PUT:
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_PUT);
break;
case Aws::Http::HttpMethod::HTTP_HEAD:
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_HEAD);
break;
case Aws::Http::HttpMethod::HTTP_PATCH:
poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_PATCH);
break;
}
for (const auto & [header_name, header_value] : request.GetHeaders())
poco_request.set(header_name, header_value);
Poco::Net::HTTPResponse poco_response;
auto & request_body_stream = session->sendRequest(poco_request);
if (request.GetContentBody())
{
LOG_DEBUG(&Logger::get("AWSClient"), "Writing request body...");
if (attempt > 0) /// rewind content body buffer.
{
request.GetContentBody()->clear();
request.GetContentBody()->seekg(0);
}
auto size = Poco::StreamCopier::copyStream(*request.GetContentBody(), request_body_stream);
LOG_DEBUG(&Logger::get("AWSClient"), "Written {} bytes to request body", size);
}
LOG_DEBUG(&Logger::get("AWSClient"), "Receiving response...");
auto & response_body_stream = session->receiveResponse(poco_response);
int status_code = static_cast<int>(poco_response.getStatus());
LOG_DEBUG(&Logger::get("AWSClient"), "Response status: {}, {}", status_code, poco_response.getReason());
if (poco_response.getStatus() == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT)
{
auto location = poco_response.get("location");
uri = location;
LOG_DEBUG(&Logger::get("AWSClient"), "Redirecting request to new location: {}", location);
continue;
}
response->SetResponseCode(static_cast<Aws::Http::HttpResponseCode>(status_code));
response->SetContentType(poco_response.getContentType());
std::stringstream headers_ss;
for (const auto & [header_name, header_value] : poco_response)
{
response->AddHeader(header_name, header_value);
headers_ss << header_name << " : " << header_value << "; ";
}
LOG_DEBUG(&Logger::get("AWSClient"), "Received headers: {}", headers_ss.str());
if (status_code >= 300)
{
String error_message;
Poco::StreamCopier::copyToString(response_body_stream, error_message);
response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION);
response->SetClientErrorMessage(error_message);
}
else
/// TODO: Do not copy whole stream.
Poco::StreamCopier::copyStream(response_body_stream, response->GetResponseBody());
break;
}
}
catch (...)
{
tryLogCurrentException(&Logger::get("AWSClient"), "Failed to make request to: " + uri);
response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION);
response->SetClientErrorMessage(getCurrentExceptionMessage(false));
}
}
}

View File

@ -0,0 +1,40 @@
#pragma once
#include <IO/ConnectionTimeouts.h>
#include <aws/core/http/HttpClient.h>
namespace Aws::Http::Standard
{
class StandardHttpResponse;
}
namespace DB::S3
{
class PocoHttpClient : public Aws::Http::HttpClient
{
public:
explicit PocoHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration);
~PocoHttpClient() override = default;
std::shared_ptr<Aws::Http::HttpResponse> MakeRequest(
Aws::Http::HttpRequest & request,
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const override;
std::shared_ptr<Aws::Http::HttpResponse> MakeRequest(
const std::shared_ptr<Aws::Http::HttpRequest> & request,
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const override;
private:
void MakeRequestInternal(
Aws::Http::HttpRequest & request,
std::shared_ptr<Aws::Http::Standard::StandardHttpResponse> & response,
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const;
std::function<Aws::Client::ClientConfigurationPerRequest(const Aws::Http::HttpRequest &)> per_request_configuration;
ConnectionTimeouts timeouts;
};
}

View File

@ -0,0 +1,32 @@
#include "PocoHttpClientFactory.h"
#include <IO/S3/PocoHttpClient.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/core/http/HttpRequest.h>
#include <aws/core/http/HttpResponse.h>
#include <aws/core/http/standard/StandardHttpRequest.h>
namespace DB::S3
{
std::shared_ptr<Aws::Http::HttpClient>
PocoHttpClientFactory::CreateHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) const
{
return std::make_shared<PocoHttpClient>(clientConfiguration);
}
std::shared_ptr<Aws::Http::HttpRequest> PocoHttpClientFactory::CreateHttpRequest(
const Aws::String & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const
{
return CreateHttpRequest(Aws::Http::URI(uri), method, streamFactory);
}
std::shared_ptr<Aws::Http::HttpRequest> PocoHttpClientFactory::CreateHttpRequest(
const Aws::Http::URI & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const
{
auto request = Aws::MakeShared<Aws::Http::Standard::StandardHttpRequest>("PocoHttpClientFactory", uri, method);
request->SetResponseStreamFactory(streamFactory);
return request;
}
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <aws/core/http/HttpClientFactory.h>
namespace Aws::Http
{
class HttpClient;
class HttpRequest;
}
namespace DB::S3
{
class PocoHttpClientFactory : public Aws::Http::HttpClientFactory
{
public:
~PocoHttpClientFactory() override = default;
[[nodiscard]] std::shared_ptr<Aws::Http::HttpClient> CreateHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) const override;
[[nodiscard]] std::shared_ptr<Aws::Http::HttpRequest>
CreateHttpRequest(const Aws::String & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const override;
[[nodiscard]] std::shared_ptr<Aws::Http::HttpRequest>
CreateHttpRequest(const Aws::Http::URI & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const override;
};
}

View File

@ -9,6 +9,11 @@
# include <aws/core/utils/logging/LogMacros.h>
# include <aws/core/utils/logging/LogSystemInterface.h>
# include <aws/s3/S3Client.h>
# include <aws/core/http/HttpClientFactory.h>
# include <IO/S3/PocoHttpClientFactory.h>
# include <IO/S3/PocoHttpClientFactory.cpp>
# include <IO/S3/PocoHttpClient.h>
# include <IO/S3/PocoHttpClient.cpp>
# include <boost/algorithm/string.hpp>
# include <Poco/URI.h>
# include <re2/re2.h>
@ -71,6 +76,7 @@ namespace S3
aws_options = Aws::SDKOptions{};
Aws::InitAPI(aws_options);
Aws::Utils::Logging::InitializeAWSLogging(std::make_shared<AWSLogger>());
Aws::Http::SetHttpClientFactory(std::make_shared<PocoHttpClientFactory>());
}
ClientFactory::~ClientFactory()

View File

@ -119,7 +119,6 @@ namespace
return Chunk(std::move(columns), num_rows);
}
reader->readSuffix();
reader.reset();
return {};