diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 00d4682332d..0b20f5fb033 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -67,6 +67,7 @@ class IColumn; M(UInt64, s3_min_upload_part_size, 512*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \ M(UInt64, s3_max_single_part_upload_size, 64*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \ M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \ + M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \ M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \ M(Bool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \ M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \ diff --git a/src/Disks/S3/registerDiskS3.cpp b/src/Disks/S3/registerDiskS3.cpp index fd658d95327..dc7020c9617 100644 --- a/src/Disks/S3/registerDiskS3.cpp +++ b/src/Disks/S3/registerDiskS3.cpp @@ -120,6 +120,7 @@ void registerDiskS3(DiskFactory & factory) cfg.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000); cfg.httpRequestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000); + cfg.maxConnections = config.getUInt(config_prefix + ".max_connections", 100); cfg.endpointOverride = uri.endpoint; auto proxy_config = getProxyConfiguration(config_prefix, config); diff --git a/src/IO/HTTPCommon.cpp b/src/IO/HTTPCommon.cpp index 91136d1fded..39951477779 100644 --- a/src/IO/HTTPCommon.cpp +++ b/src/IO/HTTPCommon.cpp @@ -106,23 +106,68 @@ namespace const std::string host; const UInt16 port; bool https; + const String proxy_host; + const UInt16 proxy_port; + bool proxy_https; + bool resolve_host; using Base = PoolBase; ObjectPtr allocObject() override { - return makeHTTPSessionImpl(host, port, https, true); + auto session = makeHTTPSessionImpl(host, port, https, true, resolve_host); + if (!proxy_host.empty()) + { + const String proxy_scheme = proxy_https ? "https" : "http"; + session->setProxyHost(proxy_host); + session->setProxyPort(proxy_port); + +#if !defined(ARCADIA_BUILD) + session->setProxyProtocol(proxy_scheme); + + /// Turn on tunnel mode if proxy scheme is HTTP while endpoint scheme is HTTPS. + session->setProxyTunnel(!proxy_https && https); +#endif + } + return session; } public: - SingleEndpointHTTPSessionPool(const std::string & host_, UInt16 port_, bool https_, size_t max_pool_size_) - : Base(max_pool_size_, &Poco::Logger::get("HTTPSessionPool")), host(host_), port(port_), https(https_) + SingleEndpointHTTPSessionPool( + const std::string & host_, + UInt16 port_, + bool https_, + const std::string & proxy_host_, + UInt16 proxy_port_, + bool proxy_https_, + size_t max_pool_size_, + bool resolve_host_ = true) + : Base(max_pool_size_, &Poco::Logger::get("HTTPSessionPool")) + , host(host_) + , port(port_) + , https(https_) + , proxy_host(proxy_host_) + , proxy_port(proxy_port_) + , proxy_https(proxy_https_) + , resolve_host(resolve_host_) { } }; class HTTPSessionPool : private boost::noncopyable { + public: + struct Key + { + String target_host; + UInt16 target_port; + bool is_target_https; + String proxy_host; + UInt16 proxy_port; + bool is_proxy_https; + + bool operator ==(const Key &) const = default; + }; + private: - using Key = std::tuple; using PoolPtr = std::shared_ptr; using Entry = SingleEndpointHTTPSessionPool::Entry; @@ -131,9 +176,12 @@ namespace size_t operator()(const Key & k) const { SipHash s; - s.update(std::get<0>(k)); - s.update(std::get<1>(k)); - s.update(std::get<2>(k)); + s.update(k.target_host); + s.update(k.target_port); + s.update(k.is_target_https); + s.update(k.proxy_host); + s.update(k.proxy_port); + s.update(k.is_proxy_https); return s.get64(); } }; @@ -153,18 +201,32 @@ namespace Entry getSession( const Poco::URI & uri, + const Poco::URI & proxy_uri, const ConnectionTimeouts & timeouts, - size_t max_connections_per_endpoint) + size_t max_connections_per_endpoint, + bool resolve_host = true) { std::unique_lock lock(mutex); const std::string & host = uri.getHost(); UInt16 port = uri.getPort(); bool https = isHTTPS(uri); - auto key = std::make_tuple(host, port, https); + + + String proxy_host; + UInt16 proxy_port = 0; + bool proxy_https = false; + if (!proxy_uri.empty()) + { + proxy_host = proxy_uri.getHost(); + proxy_port = proxy_uri.getPort(); + proxy_https = isHTTPS(proxy_uri); + } + + HTTPSessionPool::Key key{host, port, https, proxy_host, proxy_port, proxy_https}; auto pool_ptr = endpoints_pool.find(key); if (pool_ptr == endpoints_pool.end()) std::tie(pool_ptr, std::ignore) = endpoints_pool.emplace( - key, std::make_shared(host, port, https, max_connections_per_endpoint)); + key, std::make_shared(host, port, https, proxy_host, proxy_port, proxy_https, max_connections_per_endpoint, resolve_host)); auto retry_timeout = timeouts.connection_timeout.totalMicroseconds(); auto session = pool_ptr->second->get(retry_timeout); @@ -178,13 +240,17 @@ namespace if (!msg.empty()) { LOG_TRACE((&Poco::Logger::get("HTTPCommon")), "Failed communicating with {} with error '{}' will try to reconnect session", host, msg); - /// Host can change IP - const auto ip = DNSResolver::instance().resolveHost(host).toString(); - if (ip != session->getHost()) + + if (resolve_host) { - session->reset(); - session->setHost(ip); - session->attachSessionData({}); + /// Host can change IP + const auto ip = DNSResolver::instance().resolveHost(host).toString(); + if (ip != session->getHost()) + { + session->reset(); + session->setHost(ip); + session->attachSessionData({}); + } } } } @@ -218,9 +284,14 @@ HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & } -PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size) +PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host) { - return HTTPSessionPool::instance().getSession(uri, timeouts, per_endpoint_pool_size); + return makePooledHTTPSession(uri, {}, timeouts, per_endpoint_pool_size, resolve_host); +} + +PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Poco::URI & proxy_uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host) +{ + return HTTPSessionPool::instance().getSession(uri, proxy_uri, timeouts, per_endpoint_pool_size, resolve_host); } bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status) { return status == Poco::Net::HTTPResponse::HTTP_MOVED_PERMANENTLY || status == Poco::Net::HTTPResponse::HTTP_FOUND || status == Poco::Net::HTTPResponse::HTTP_SEE_OTHER || status == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT; } diff --git a/src/IO/HTTPCommon.h b/src/IO/HTTPCommon.h index db0abe8fc6e..4a81d23a8a3 100644 --- a/src/IO/HTTPCommon.h +++ b/src/IO/HTTPCommon.h @@ -50,8 +50,9 @@ void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigne /// Create session object to perform requests and set required parameters. HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host = true); -/// As previous method creates session, but tooks it from pool -PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size); +/// As previous method creates session, but tooks it from pool, without and with proxy uri. +PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host = true); +PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Poco::URI & proxy_uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host = true); bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status); diff --git a/src/IO/S3/PocoHTTPClient.cpp b/src/IO/S3/PocoHTTPClient.cpp index 57916251d9b..2389f9a2192 100644 --- a/src/IO/S3/PocoHTTPClient.cpp +++ b/src/IO/S3/PocoHTTPClient.cpp @@ -6,7 +6,7 @@ #include #include -#include +#include #include #include #include @@ -86,6 +86,7 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & clientConfigu )) , remote_host_filter(clientConfiguration.remote_host_filter) , s3_max_redirects(clientConfiguration.s3_max_redirects) + , max_connections(clientConfiguration.maxConnections) { } @@ -164,28 +165,24 @@ void PocoHTTPClient::makeRequestInternal( { for (unsigned int attempt = 0; attempt <= s3_max_redirects; ++attempt) { - Poco::URI poco_uri(uri); - - /// Reverse proxy can replace host header with resolved ip address instead of host name. - /// This can lead to request signature difference on S3 side. - auto session = makeHTTPSession(poco_uri, timeouts, false); + Poco::URI target_uri(uri); + Poco::URI proxy_uri; auto request_configuration = per_request_configuration(request); if (!request_configuration.proxyHost.empty()) { - /// Turn on tunnel mode if proxy scheme is HTTP while endpoint scheme is HTTPS. - bool use_tunnel = request_configuration.proxyScheme == Aws::Http::Scheme::HTTP && poco_uri.getScheme() == "https"; - session->setProxy( - request_configuration.proxyHost, - request_configuration.proxyPort, - Aws::Http::SchemeMapper::ToString(request_configuration.proxyScheme), - use_tunnel - ); + proxy_uri.setScheme(Aws::Http::SchemeMapper::ToString(request_configuration.proxyScheme)); + proxy_uri.setHost(request_configuration.proxyHost); + proxy_uri.setPort(request_configuration.proxyPort); } + /// Reverse proxy can replace host header with resolved ip address instead of host name. + /// This can lead to request signature difference on S3 side. + auto session = makePooledHTTPSession(target_uri, proxy_uri, timeouts, max_connections, false); + Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1); - poco_request.setURI(poco_uri.getPathAndQuery()); + poco_request.setURI(target_uri.getPathAndQuery()); switch (request.GetMethod()) { @@ -281,7 +278,7 @@ void PocoHTTPClient::makeRequestInternal( } } else - response->GetResponseStream().SetUnderlyingStream(std::make_shared(session, response_body_stream)); + response->GetResponseStream().SetUnderlyingStream(std::make_shared>(session, response_body_stream)); return; } @@ -297,6 +294,7 @@ void PocoHTTPClient::makeRequestInternal( ProfileEvents::increment(select_metric(S3MetricType::Errors)); } } + } #endif diff --git a/src/IO/S3/PocoHTTPClient.h b/src/IO/S3/PocoHTTPClient.h index 560f0a455f0..e4fc453f388 100644 --- a/src/IO/S3/PocoHTTPClient.h +++ b/src/IO/S3/PocoHTTPClient.h @@ -56,6 +56,7 @@ private: ConnectionTimeouts timeouts; const RemoteHostFilter & remote_host_filter; unsigned int s3_max_redirects; + unsigned int max_connections; }; } diff --git a/src/IO/S3/PocoHTTPResponseStream.cpp b/src/IO/S3/PocoHTTPResponseStream.cpp deleted file mode 100644 index 93f85d65e30..00000000000 --- a/src/IO/S3/PocoHTTPResponseStream.cpp +++ /dev/null @@ -1,19 +0,0 @@ -#include - -#if USE_AWS_S3 - - -#include "PocoHTTPResponseStream.h" - -#include - -namespace DB::S3 -{ -PocoHTTPResponseStream::PocoHTTPResponseStream(std::shared_ptr session_, std::istream & response_stream_) - : Aws::IOStream(response_stream_.rdbuf()), session(std::move(session_)) -{ -} - -} - -#endif diff --git a/src/IO/S3/PocoHTTPResponseStream.h b/src/IO/S3/PocoHTTPResponseStream.h deleted file mode 100644 index fe3df6e52a7..00000000000 --- a/src/IO/S3/PocoHTTPResponseStream.h +++ /dev/null @@ -1,21 +0,0 @@ -#pragma once - -#include -#include - -namespace DB::S3 -{ -/** - * Wrapper of IStream to store response stream and corresponding HTTP session. - */ -class PocoHTTPResponseStream : public Aws::IOStream -{ -public: - PocoHTTPResponseStream(std::shared_ptr session_, std::istream & response_stream_); - -private: - /// Poco HTTP session is holder of response stream. - std::shared_ptr session; -}; - -} diff --git a/src/IO/S3/SessionAwareAwsStream.h b/src/IO/S3/SessionAwareAwsStream.h new file mode 100644 index 00000000000..70ddafba067 --- /dev/null +++ b/src/IO/S3/SessionAwareAwsStream.h @@ -0,0 +1,27 @@ +#pragma once + +#include + +#include + + +namespace DB::S3 +{ +/** + * Wrapper of IOStream to store response stream and corresponding HTTP session. + */ +template +class SessionAwareAwsStream : public Aws::IOStream +{ +public: + SessionAwareAwsStream(Session session_, std::iostream & response_stream_) + : Aws::IStream(response_stream_.rdbuf()), session(std::move(session_)) + { + } + +private: + /// Poco HTTP session is holder of response stream. + Session session; +}; + +} diff --git a/src/IO/S3Common.cpp b/src/IO/S3Common.cpp index 06c51e058a0..d4c4ba9bb02 100644 --- a/src/IO/S3Common.cpp +++ b/src/IO/S3Common.cpp @@ -280,7 +280,7 @@ namespace S3 } std::shared_ptr ClientFactory::create( // NOLINT - Aws::Client::ClientConfiguration & cfg, + const Aws::Client::ClientConfiguration & cfg, bool is_virtual_hosted_style, const String & access_key_id, const String & secret_access_key, @@ -306,7 +306,7 @@ namespace S3 } std::shared_ptr ClientFactory::create( // NOLINT - const String & endpoint, + const Aws::Client::ClientConfiguration & cfg, bool is_virtual_hosted_style, const String & access_key_id, const String & secret_access_key, @@ -315,10 +315,7 @@ namespace S3 const RemoteHostFilter & remote_host_filter, unsigned int s3_max_redirects) { - PocoHTTPClientConfiguration client_configuration({}, remote_host_filter, s3_max_redirects); - - if (!endpoint.empty()) - client_configuration.endpointOverride = endpoint; + PocoHTTPClientConfiguration client_configuration(cfg, remote_host_filter, s3_max_redirects); client_configuration.updateSchemeAndRegion(); diff --git a/src/IO/S3Common.h b/src/IO/S3Common.h index 664c07d5bf4..e2ec0785811 100644 --- a/src/IO/S3Common.h +++ b/src/IO/S3Common.h @@ -41,7 +41,7 @@ public: unsigned int s3_max_redirects); std::shared_ptr create( - Aws::Client::ClientConfiguration & cfg, + const Aws::Client::ClientConfiguration & cfg, bool is_virtual_hosted_style, const String & access_key_id, const String & secret_access_key, @@ -50,7 +50,7 @@ public: unsigned int s3_max_redirects); std::shared_ptr create( - const String & endpoint, + const Aws::Client::ClientConfiguration & cfg, bool is_virtual_hosted_style, const String & access_key_id, const String & secret_access_key, diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 5d7fc0cdaa9..1920a84e447 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -196,6 +196,7 @@ StorageS3::StorageS3( const String & format_name_, UInt64 min_upload_part_size_, UInt64 max_single_part_upload_size_, + UInt64 max_connections_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const Context & context_, @@ -220,8 +221,12 @@ StorageS3::StorageS3( if (access_key_id_.empty()) credentials = Aws::Auth::AWSCredentials(std::move(settings.access_key_id), std::move(settings.secret_access_key)); + Aws::Client::ClientConfiguration client_configuration; + client_configuration.endpointOverride = uri_.endpoint; + client_configuration.maxConnections = max_connections_; + client = S3::ClientFactory::instance().create( - uri_.endpoint, + client_configuration, uri_.is_virtual_hosted_style, credentials.GetAWSAccessKeyId(), credentials.GetAWSSecretKey(), @@ -374,6 +379,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory) UInt64 min_upload_part_size = args.local_context.getSettingsRef().s3_min_upload_part_size; UInt64 max_single_part_upload_size = args.local_context.getSettingsRef().s3_max_single_part_upload_size; + UInt64 max_connections = args.local_context.getSettingsRef().s3_max_connections; String compression_method; String format_name; @@ -396,6 +402,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory) format_name, min_upload_part_size, max_single_part_upload_size, + max_connections, args.columns, args.constraints, args.context, diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index f436fb85c90..f006de39c99 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -32,6 +32,7 @@ public: const String & format_name_, UInt64 min_upload_part_size_, UInt64 max_single_part_upload_size_, + UInt64 max_connections_, const ColumnsDescription & columns_, const ConstraintsDescription & constraints_, const Context & context_, diff --git a/src/TableFunctions/TableFunctionS3.cpp b/src/TableFunctions/TableFunctionS3.cpp index cc7877b204e..6dc9230ca46 100644 --- a/src/TableFunctions/TableFunctionS3.cpp +++ b/src/TableFunctions/TableFunctionS3.cpp @@ -68,6 +68,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const C S3::URI s3_uri (uri); UInt64 min_upload_part_size = context.getSettingsRef().s3_min_upload_part_size; UInt64 max_single_part_upload_size = context.getSettingsRef().s3_max_single_part_upload_size; + UInt64 max_connections = context.getSettingsRef().s3_max_connections; StoragePtr storage = StorageS3::create( s3_uri, @@ -77,6 +78,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const C format, min_upload_part_size, max_single_part_upload_size, + max_connections, getActualTableStructure(context), ConstraintsDescription{}, const_cast(context),