diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index baa0fbcb883..13e8aac6906 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -74,6 +74,7 @@ if(USE_RDKAFKA) endif() if (USE_AWS_S3) + add_headers_and_sources(dbms Common/S3) add_headers_and_sources(dbms Disks/S3) endif() diff --git a/src/IO/HTTPCommon.cpp b/src/IO/HTTPCommon.cpp index 088ca0c246e..6b7f30cd9b6 100644 --- a/src/IO/HTTPCommon.cpp +++ b/src/IO/HTTPCommon.cpp @@ -68,7 +68,7 @@ namespace throw Exception("Unsupported scheme in URI '" + uri.toString() + "'", ErrorCodes::UNSUPPORTED_URI_SCHEME); } - HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive) + HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive, bool resolve_host=true) { HTTPSessionPtr session; @@ -83,7 +83,10 @@ namespace ProfileEvents::increment(ProfileEvents::CreatedHTTPConnections); - session->setHost(DNSResolver::instance().resolveHost(host).toString()); + if (resolve_host) + session->setHost(DNSResolver::instance().resolveHost(host).toString()); + else + session->setHost(host); session->setPort(port); /// doesn't work properly without patch @@ -202,13 +205,13 @@ void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigne response.set("Keep-Alive", "timeout=" + std::to_string(timeout.totalSeconds())); } -HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts) +HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host) { const std::string & host = uri.getHost(); UInt16 port = uri.getPort(); bool https = isHTTPS(uri); - auto session = makeHTTPSessionImpl(host, port, https, false); + auto session = makeHTTPSessionImpl(host, port, https, false, resolve_host); setTimeouts(*session, timeouts); return session; } diff --git a/src/IO/HTTPCommon.h b/src/IO/HTTPCommon.h index 7592c1c31b3..66764b1c805 100644 --- a/src/IO/HTTPCommon.h +++ b/src/IO/HTTPCommon.h @@ -45,7 +45,7 @@ using HTTPSessionPtr = std::shared_ptr; void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout); /// Create session object to perform requests and set required parameters. -HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts); +HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host = true); /// As previous method creates session, but tooks it from pool PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size); diff --git a/src/IO/S3/PocoHttpClient.cpp b/src/IO/S3/PocoHttpClient.cpp new file mode 100644 index 00000000000..f7b71c956a7 --- /dev/null +++ b/src/IO/S3/PocoHttpClient.cpp @@ -0,0 +1,163 @@ +#include "PocoHttpClient.h" + +#include +#include +#include +#include +#include +#include +#include +#include "Poco/StreamCopier.h" +#include +#include +#include + +namespace DB::S3 +{ +PocoHttpClient::PocoHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) + : per_request_configuration(clientConfiguration.perRequestConfiguration) + , timeouts(ConnectionTimeouts( + Poco::Timespan(clientConfiguration.connectTimeoutMs * 1000), /// connection timeout. + Poco::Timespan(clientConfiguration.httpRequestTimeoutMs * 1000), /// send timeout. + Poco::Timespan(clientConfiguration.httpRequestTimeoutMs * 1000) /// receive timeout. + )) +{ +} + +std::shared_ptr PocoHttpClient::MakeRequest( + Aws::Http::HttpRequest & request, + Aws::Utils::RateLimits::RateLimiterInterface * readLimiter, + Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const +{ + auto response = Aws::MakeShared("PocoHttpClient", request); + MakeRequestInternal(request, response, readLimiter, writeLimiter); + return response; +} + +std::shared_ptr PocoHttpClient::MakeRequest( + const std::shared_ptr & request, + Aws::Utils::RateLimits::RateLimiterInterface * readLimiter, + Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const +{ + auto response = Aws::MakeShared("PocoHttpClient", request); + MakeRequestInternal(*request, response, readLimiter, writeLimiter); + return response; +} + +void PocoHttpClient::MakeRequestInternal( + Aws::Http::HttpRequest & request, + std::shared_ptr & response, + Aws::Utils::RateLimits::RateLimiterInterface *, + Aws::Utils::RateLimits::RateLimiterInterface *) const +{ + auto uri = request.GetUri().GetURIString(); + LOG_DEBUG(&Logger::get("AWSClient"), "Make request to: {}", uri); + + const int MAX_REDIRECT_ATTEMPTS = 10; + try + { + for (int attempt = 0; attempt < MAX_REDIRECT_ATTEMPTS; ++attempt) + { + Poco::URI poco_uri(uri); + + /// Reverse proxy can replace host header with resolved ip address instead of host name. + /// This can lead to request signature difference on S3 side. + auto session = makeHTTPSession(poco_uri, timeouts, false); + + auto request_configuration = per_request_configuration(request); + if (!request_configuration.proxyHost.empty()) + session->setProxy(request_configuration.proxyHost, request_configuration.proxyPort); + + Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1); + + poco_request.setURI(poco_uri.getPathAndQuery()); + + switch (request.GetMethod()) + { + case Aws::Http::HttpMethod::HTTP_GET: + poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_GET); + break; + case Aws::Http::HttpMethod::HTTP_POST: + poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_POST); + break; + case Aws::Http::HttpMethod::HTTP_DELETE: + poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_DELETE); + break; + case Aws::Http::HttpMethod::HTTP_PUT: + poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_PUT); + break; + case Aws::Http::HttpMethod::HTTP_HEAD: + poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_HEAD); + break; + case Aws::Http::HttpMethod::HTTP_PATCH: + poco_request.setMethod(Poco::Net::HTTPRequest::HTTP_PATCH); + break; + } + + for (const auto & [header_name, header_value] : request.GetHeaders()) + poco_request.set(header_name, header_value); + + Poco::Net::HTTPResponse poco_response; + auto & request_body_stream = session->sendRequest(poco_request); + + if (request.GetContentBody()) + { + LOG_DEBUG(&Logger::get("AWSClient"), "Writing request body..."); + if (attempt > 0) /// rewind content body buffer. + { + request.GetContentBody()->clear(); + request.GetContentBody()->seekg(0); + } + auto size = Poco::StreamCopier::copyStream(*request.GetContentBody(), request_body_stream); + LOG_DEBUG(&Logger::get("AWSClient"), "Written {} bytes to request body", size); + } + + LOG_DEBUG(&Logger::get("AWSClient"), "Receiving response..."); + auto & response_body_stream = session->receiveResponse(poco_response); + + int status_code = static_cast(poco_response.getStatus()); + LOG_DEBUG(&Logger::get("AWSClient"), "Response status: {}, {}", status_code, poco_response.getReason()); + + if (poco_response.getStatus() == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT) + { + auto location = poco_response.get("location"); + uri = location; + LOG_DEBUG(&Logger::get("AWSClient"), "Redirecting request to new location: {}", location); + + continue; + } + + response->SetResponseCode(static_cast(status_code)); + response->SetContentType(poco_response.getContentType()); + + std::stringstream headers_ss; + for (const auto & [header_name, header_value] : poco_response) + { + response->AddHeader(header_name, header_value); + headers_ss << header_name << " : " << header_value << "; "; + } + LOG_DEBUG(&Logger::get("AWSClient"), "Received headers: {}", headers_ss.str()); + + if (status_code >= 300) + { + String error_message; + Poco::StreamCopier::copyToString(response_body_stream, error_message); + + response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage(error_message); + } + else + /// TODO: Do not copy whole stream. + Poco::StreamCopier::copyStream(response_body_stream, response->GetResponseBody()); + + break; + } + } + catch (...) + { + tryLogCurrentException(&Logger::get("AWSClient"), "Failed to make request to: " + uri); + response->SetClientErrorType(Aws::Client::CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage(getCurrentExceptionMessage(false)); + } +} +} diff --git a/src/IO/S3/PocoHttpClient.h b/src/IO/S3/PocoHttpClient.h new file mode 100644 index 00000000000..7458c692a0e --- /dev/null +++ b/src/IO/S3/PocoHttpClient.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include + +namespace Aws::Http::Standard +{ +class StandardHttpResponse; +} + +namespace DB::S3 +{ + +class PocoHttpClient : public Aws::Http::HttpClient +{ +public: + explicit PocoHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration); + ~PocoHttpClient() override = default; + std::shared_ptr MakeRequest( + Aws::Http::HttpRequest & request, + Aws::Utils::RateLimits::RateLimiterInterface * readLimiter, + Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const override; + + std::shared_ptr MakeRequest( + const std::shared_ptr & request, + Aws::Utils::RateLimits::RateLimiterInterface * readLimiter, + Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const override; + +private: + void MakeRequestInternal( + Aws::Http::HttpRequest & request, + std::shared_ptr & response, + Aws::Utils::RateLimits::RateLimiterInterface * readLimiter, + Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const; + + std::function per_request_configuration; + ConnectionTimeouts timeouts; +}; + +} diff --git a/src/IO/S3/PocoHttpClientFactory.cpp b/src/IO/S3/PocoHttpClientFactory.cpp new file mode 100644 index 00000000000..96a73e1d001 --- /dev/null +++ b/src/IO/S3/PocoHttpClientFactory.cpp @@ -0,0 +1,32 @@ +#include "PocoHttpClientFactory.h" + +#include +#include +#include +#include +#include + +namespace DB::S3 +{ +std::shared_ptr +PocoHttpClientFactory::CreateHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) const +{ + return std::make_shared(clientConfiguration); +} + +std::shared_ptr PocoHttpClientFactory::CreateHttpRequest( + const Aws::String & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const +{ + return CreateHttpRequest(Aws::Http::URI(uri), method, streamFactory); +} + +std::shared_ptr PocoHttpClientFactory::CreateHttpRequest( + const Aws::Http::URI & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const +{ + auto request = Aws::MakeShared("PocoHttpClientFactory", uri, method); + request->SetResponseStreamFactory(streamFactory); + + return request; +} + +} diff --git a/src/IO/S3/PocoHttpClientFactory.h b/src/IO/S3/PocoHttpClientFactory.h new file mode 100644 index 00000000000..ac73a0356ff --- /dev/null +++ b/src/IO/S3/PocoHttpClientFactory.h @@ -0,0 +1,25 @@ +#pragma once + +#include + +namespace Aws::Http +{ + class HttpClient; + class HttpRequest; +} + +namespace DB::S3 +{ + +class PocoHttpClientFactory : public Aws::Http::HttpClientFactory +{ +public: + ~PocoHttpClientFactory() override = default; + [[nodiscard]] std::shared_ptr CreateHttpClient(const Aws::Client::ClientConfiguration & clientConfiguration) const override; + [[nodiscard]] std::shared_ptr + CreateHttpRequest(const Aws::String & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const override; + [[nodiscard]] std::shared_ptr + CreateHttpRequest(const Aws::Http::URI & uri, Aws::Http::HttpMethod method, const Aws::IOStreamFactory & streamFactory) const override; +}; + +} diff --git a/src/IO/S3Common.cpp b/src/IO/S3Common.cpp index b67a1723aca..9aa218b254a 100644 --- a/src/IO/S3Common.cpp +++ b/src/IO/S3Common.cpp @@ -9,6 +9,11 @@ # include # include # include +# include +# include +# include +# include +# include # include # include # include @@ -71,6 +76,7 @@ namespace S3 aws_options = Aws::SDKOptions{}; Aws::InitAPI(aws_options); Aws::Utils::Logging::InitializeAWSLogging(std::make_shared()); + Aws::Http::SetHttpClientFactory(std::make_shared()); } ClientFactory::~ClientFactory() diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 22fe4656f17..e8fd89c4505 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -119,7 +119,6 @@ namespace return Chunk(std::move(columns), num_rows); } - reader->readSuffix(); reader.reset(); return {};