Merge pull request #11561 from Jokser/s3-http-client-memory-optimization

Avoid copying whole response stream into memory in S3 HTTP client.
This commit is contained in:
Nikita Mikhaylov 2020-06-10 16:53:33 +04:00 committed by GitHub
commit 9df693ced5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 48 additions and 9 deletions

2
contrib/aws vendored

@ -1 +1 @@
Subproject commit f7d9ce39f41323300044567be007c233338bb94a Subproject commit 17e10c0fc77f22afe890fa6d1b283760e5edaa56

View File

@ -2,6 +2,8 @@
#include <utility> #include <utility>
#include <IO/HTTPCommon.h> #include <IO/HTTPCommon.h>
#include <IO/S3/PocoHTTPResponseStream.h>
#include <IO/S3/PocoHTTPResponseStream.cpp>
#include <aws/core/http/HttpRequest.h> #include <aws/core/http/HttpRequest.h>
#include <aws/core/http/HttpResponse.h> #include <aws/core/http/HttpResponse.h>
#include <aws/core/http/standard/StandardHttpResponse.h> #include <aws/core/http/standard/StandardHttpResponse.h>
@ -149,8 +151,7 @@ void PocoHTTPClient::MakeRequestInternal(
response->SetClientErrorMessage(error_message); response->SetClientErrorMessage(error_message);
} }
else else
/// TODO: Do not copy whole stream. response->GetResponseStream().SetUnderlyingStream(std::make_shared<PocoHTTPResponseStream>(session, response_body_stream));
Poco::StreamCopier::copyStream(response_body_stream, response->GetResponseBody());
break; break;
} }

View File

@ -21,10 +21,12 @@ std::shared_ptr<Aws::Http::HttpRequest> PocoHTTPClientFactory::CreateHttpRequest
} }
std::shared_ptr<Aws::Http::HttpRequest> PocoHTTPClientFactory::CreateHttpRequest( std::shared_ptr<Aws::Http::HttpRequest> PocoHTTPClientFactory::CreateHttpRequest(
const Aws::Http::URI & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const const Aws::Http::URI & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory &) const
{ {
auto request = Aws::MakeShared<Aws::Http::Standard::StandardHttpRequest>("PocoHTTPClientFactory", uri, method); auto request = Aws::MakeShared<Aws::Http::Standard::StandardHttpRequest>("PocoHTTPClientFactory", uri, method);
request->SetResponseStreamFactory(streamFactory);
/// Don't create default response stream. Actual response stream will be set later in PocoHTTPClient.
request->SetResponseStreamFactory(null_factory);
return request; return request;
} }

View File

@ -4,22 +4,25 @@
namespace Aws::Http namespace Aws::Http
{ {
class HttpClient; class HttpClient;
class HttpRequest; class HttpRequest;
} }
namespace DB::S3 namespace DB::S3
{ {
class PocoHTTPClientFactory : public Aws::Http::HttpClientFactory class PocoHTTPClientFactory : public Aws::Http::HttpClientFactory
{ {
public: public:
~PocoHTTPClientFactory() override = default; ~PocoHTTPClientFactory() override = default;
[[nodiscard]] std::shared_ptr<Aws::Http::HttpClient> CreateHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) const override; [[nodiscard]] std::shared_ptr<Aws::Http::HttpClient>
CreateHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) const override;
[[nodiscard]] std::shared_ptr<Aws::Http::HttpRequest> [[nodiscard]] std::shared_ptr<Aws::Http::HttpRequest>
CreateHttpRequest(const Aws::String & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const override; CreateHttpRequest(const Aws::String & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const override;
[[nodiscard]] std::shared_ptr<Aws::Http::HttpRequest> [[nodiscard]] std::shared_ptr<Aws::Http::HttpRequest>
CreateHttpRequest(const Aws::Http::URI & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const override; CreateHttpRequest(const Aws::Http::URI & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const override;
private:
const Aws::IOStreamFactory null_factory = []() { return nullptr; };
}; };
} }

View File

@ -0,0 +1,12 @@
#include "PocoHTTPResponseStream.h"
#include <utility>
namespace DB::S3
{
PocoHTTPResponseStream::PocoHTTPResponseStream(std::shared_ptr<Poco::Net::HTTPClientSession> session_, std::istream & response_stream_)
: Aws::IStream(response_stream_.rdbuf()), session(std::move(session_))
{
}
}

View File

@ -0,0 +1,21 @@
#pragma once
#include <aws/core/utils/stream/ResponseStream.h>
#include <Poco/Net/HTTPClientSession.h>
namespace DB::S3
{
/**
* Wrapper of IStream to store response stream and corresponding HTTP session.
*/
class PocoHTTPResponseStream : public Aws::IStream
{
public:
PocoHTTPResponseStream(std::shared_ptr<Poco::Net::HTTPClientSession> session_, std::istream & response_stream_);
private:
/// Poco HTTP session is holder of response stream.
std::shared_ptr<Poco::Net::HTTPClientSession> session;
};
}