diff --git a/contrib/aws b/contrib/aws index f7d9ce39f41..17e10c0fc77 160000 --- a/contrib/aws +++ b/contrib/aws @@ -1 +1 @@ -Subproject commit f7d9ce39f41323300044567be007c233338bb94a +Subproject commit 17e10c0fc77f22afe890fa6d1b283760e5edaa56 diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index b8de483a5a8..0dfa80ca107 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -2,6 +2,8 @@ #include #include +#include +#include #include #include #include @@ -149,8 +151,7 @@ void PocoHTTPClient::MakeRequestInternal( response->SetClientErrorMessage(error_message); } else - /// TODO: Do not copy whole stream. - Poco::StreamCopier::copyStream(response_body_stream, response->GetResponseBody()); + response->GetResponseStream().SetUnderlyingStream(std::make_shared(session, response_body_stream)); break; } diff --git a/src/IO/S3/PocoHTTPClientFactory.cpp b/src/IO/S3/PocoHTTPClientFactory.cpp index 033ad4af37c..e4b86593ec1 100644 --- a/src/IO/S3/PocoHTTPClientFactory.cpp +++ b/src/IO/S3/PocoHTTPClientFactory.cpp @@ -21,10 +21,12 @@ std::shared_ptr PocoHTTPClientFactory::CreateHttpRequest } std::shared_ptr PocoHTTPClientFactory::CreateHttpRequest( - const Aws::Http::URI & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const + const Aws::Http::URI & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory &) const { auto request = Aws::MakeShared("PocoHTTPClientFactory", uri, method); - request->SetResponseStreamFactory(streamFactory); + + /// Don't create default response stream. Actual response stream will be set later in PocoHTTPClient. + request->SetResponseStreamFactory(null_factory); return request; } diff --git a/src/IO/S3/PocoHTTPClientFactory.h b/src/IO/S3/PocoHTTPClientFactory.h index ac586289113..4e555f05502 100644 --- a/src/IO/S3/PocoHTTPClientFactory.h +++ b/src/IO/S3/PocoHTTPClientFactory.h @@ -4,22 +4,25 @@ namespace Aws::Http { - class HttpClient; - class HttpRequest; +class HttpClient; +class HttpRequest; } namespace DB::S3 { - class PocoHTTPClientFactory : public Aws::Http::HttpClientFactory { public: ~PocoHTTPClientFactory() override = default; - [[nodiscard]] std::shared_ptr CreateHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) const override; + [[nodiscard]] std::shared_ptr + CreateHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) const override; [[nodiscard]] std::shared_ptr CreateHttpRequest(const Aws::String & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const override; [[nodiscard]] std::shared_ptr CreateHttpRequest(const Aws::Http::URI & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const override; + +private: + const Aws::IOStreamFactory null_factory = []() { return nullptr; }; }; } diff --git a/src/IO/S3/PocoHTTPResponseStream.cpp b/src/IO/S3/PocoHTTPResponseStream.cpp new file mode 100644 index 00000000000..0a198268f2e --- /dev/null +++ b/src/IO/S3/PocoHTTPResponseStream.cpp @@ -0,0 +1,12 @@ +#include "PocoHTTPResponseStream.h" + +#include + +namespace DB::S3 +{ +PocoHTTPResponseStream::PocoHTTPResponseStream(std::shared_ptr session_, std::istream & response_stream_) + : Aws::IStream(response_stream_.rdbuf()), session(std::move(session_)) +{ +} + +} diff --git a/src/IO/S3/PocoHTTPResponseStream.h b/src/IO/S3/PocoHTTPResponseStream.h new file mode 100644 index 00000000000..8167ddc4346 --- /dev/null +++ b/src/IO/S3/PocoHTTPResponseStream.h @@ -0,0 +1,21 @@ +#pragma once + +#include +#include + +namespace DB::S3 +{ +/** + * Wrapper of IStream to store response stream and corresponding HTTP session. + */ +class PocoHTTPResponseStream : public Aws::IStream +{ +public: + PocoHTTPResponseStream(std::shared_ptr session_, std::istream & response_stream_); + +private: + /// Poco HTTP session is holder of response stream. + std::shared_ptr session; +}; + +}