Updated AWS C++ SDK.

This commit is contained in:
Vladimir Chebotarev 2021-01-26 10:49:16 +03:00
parent 99fee8e117
commit 3322cc812d
6 changed files with 87 additions and 50 deletions

2
contrib/aws vendored

@ -1 +1 @@
Subproject commit a220591e335923ce1c19bbf9eb925787f7ab6c13
Subproject commit 7d48b2c8193679cc4516e5bd68ae4a64b94dae7d

View File

@ -6,13 +6,11 @@
#include <utility>
#include <IO/HTTPCommon.h>
#include <IO/S3/SessionAwareAwsStream.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Common/Stopwatch.h>
#include <aws/core/http/HttpRequest.h>
#include <aws/core/http/HttpResponse.h>
#include <aws/core/http/standard/StandardHttpResponse.h>
#include <aws/core/monitoring/HttpClientMetrics.h>
#include <aws/core/utils/ratelimiter/RateLimiterInterface.h>
#include "Poco/StreamCopier.h"
@ -90,29 +88,19 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & clientConfigu
{
}
std::shared_ptr<Aws::Http::HttpResponse> PocoHTTPClient::MakeRequest(
Aws::Http::HttpRequest & request,
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const
{
auto response = Aws::MakeShared<Aws::Http::Standard::StandardHttpResponse>("PocoHTTPClient", request);
makeRequestInternal(request, response, readLimiter, writeLimiter);
return response;
}
std::shared_ptr<Aws::Http::HttpResponse> PocoHTTPClient::MakeRequest(
const std::shared_ptr<Aws::Http::HttpRequest> & request,
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const
{
auto response = Aws::MakeShared<Aws::Http::Standard::StandardHttpResponse>("PocoHTTPClient", request);
auto response = Aws::MakeShared<PocoHTTPResponse>("PocoHTTPClient", request);
makeRequestInternal(*request, response, readLimiter, writeLimiter);
return response;
}
void PocoHTTPClient::makeRequestInternal(
Aws::Http::HttpRequest & request,
std::shared_ptr<Aws::Http::Standard::StandardHttpResponse> & response,
std::shared_ptr<PocoHTTPResponse> & response,
Aws::Utils::RateLimits::RateLimiterInterface *,
Aws::Utils::RateLimits::RateLimiterInterface *) const
{
@ -278,7 +266,7 @@ void PocoHTTPClient::makeRequestInternal(
}
}
else
response->GetResponseStream().SetUnderlyingStream(std::make_shared<SessionAwareAwsStream<decltype(session)>>(session, response_body_stream));
response->SetResponseBody(response_body_stream, session);
return;
}

View File

@ -2,9 +2,12 @@
#include <Common/RemoteHostFilter.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/HTTPCommon.h>
#include <IO/S3/SessionAwareIOStream.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/core/http/HttpClient.h>
#include <aws/core/http/HttpRequest.h>
#include <aws/core/http/standard/StandardHttpResponse.h>
namespace Aws::Http::Standard
{
@ -30,15 +33,43 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
void updateSchemeAndRegion();
};
class PocoHTTPResponse : public Aws::Http::Standard::StandardHttpResponse
{
public:
using SessionPtr = PooledHTTPSessionPtr;
PocoHTTPResponse(const std::shared_ptr<const Aws::Http::HttpRequest> request)
: Aws::Http::Standard::StandardHttpResponse(request)
, body_stream(request->GetResponseStreamFactory())
{
}
void SetResponseBody(Aws::IStream & incoming_stream, SessionPtr & session_)
{
body_stream = Aws::Utils::Stream::ResponseStream(
Aws::New<SessionAwareIOStream<SessionPtr>>("http result streambuf", session_, incoming_stream.rdbuf())
);
}
Aws::IOStream & GetResponseBody() const override
{
return body_stream.GetUnderlyingStream();
}
Aws::Utils::Stream::ResponseStream && SwapResponseStreamOwnership() override
{
return std::move(body_stream);
}
private:
Aws::Utils::Stream::ResponseStream body_stream;
};
class PocoHTTPClient : public Aws::Http::HttpClient
{
public:
explicit PocoHTTPClient(const PocoHTTPClientConfiguration & clientConfiguration);
~PocoHTTPClient() override = default;
std::shared_ptr<Aws::Http::HttpResponse> MakeRequest(
Aws::Http::HttpRequest & request,
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const override;
std::shared_ptr<Aws::Http::HttpResponse> MakeRequest(
const std::shared_ptr<Aws::Http::HttpRequest> & request,
@ -48,7 +79,7 @@ public:
private:
void makeRequestInternal(
Aws::Http::HttpRequest & request,
std::shared_ptr<Aws::Http::Standard::StandardHttpResponse> & response,
std::shared_ptr<PocoHTTPResponse> & response,
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const;

View File

@ -1,27 +0,0 @@
#pragma once
#include <IO/HTTPCommon.h>
#include <aws/core/utils/stream/ResponseStream.h>
namespace DB::S3
{
/**
* Wrapper of IOStream to store response stream and corresponding HTTP session.
*/
template <typename Session>
class SessionAwareAwsStream : public Aws::IStream
{
public:
SessionAwareAwsStream(Session session_, std::istream & response_stream_)
: Aws::IStream(response_stream_.rdbuf()), session(std::move(session_))
{
}
private:
/// Poco HTTP session is holder of response stream.
Session session;
};
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <iostream>
namespace DB::S3
{
/**
* Wrapper of IOStream to store response stream and corresponding HTTP session.
*/
template <typename Session>
class SessionAwareIOStream : public std::iostream
{
public:
SessionAwareIOStream(Session session_, std::streambuf * sb)
: std::iostream(sb)
, session(std::move(session_))
{
}
private:
/// Poco HTTP session is holder of response stream.
Session session;
};
}

View File

@ -207,13 +207,32 @@ public:
return result;
}
bool SignRequest(Aws::Http::HttpRequest & request, const char * region, const char * service_name, bool sign_body) const override
{
auto result = Aws::Client::AWSAuthV4Signer::SignRequest(request, region, service_name, sign_body);
for (const auto & header : headers)
request.SetHeaderValue(header.name, header.value);
return result;
}
bool PresignRequest(
Aws::Http::HttpRequest & request,
const char * region,
const char * serviceName,
long long expiration_time_sec) const override // NOLINT
{
auto result = Aws::Client::AWSAuthV4Signer::PresignRequest(request, region, serviceName, expiration_time_sec);
auto result = Aws::Client::AWSAuthV4Signer::PresignRequest(request, region, expiration_time_sec);
for (const auto & header : headers)
request.SetHeaderValue(header.name, header.value);
return result;
}
bool PresignRequest(
Aws::Http::HttpRequest & request,
const char * region,
const char * service_name,
long long expiration_time_sec) const override // NOLINT
{
auto result = Aws::Client::AWSAuthV4Signer::PresignRequest(request, region, service_name, expiration_time_sec);
for (const auto & header : headers)
request.SetHeaderValue(header.name, header.value);
return result;