Merge pull request #13405 from excitoon-favorites/s3keepalive

Connection pools for S3
This commit is contained in:
Alexander Kuzmenkov 2021-01-18 12:52:37 +03:00 committed by GitHub
commit d22c04568d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 156 additions and 85 deletions

View File

@ -67,6 +67,7 @@ class IColumn;
M(UInt64, s3_min_upload_part_size, 512*1024*1024, "The minimum size of part to upload during multipart upload to S3.", 0) \
M(UInt64, s3_max_single_part_upload_size, 64*1024*1024, "The maximum size of object to upload using singlepart upload to S3.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, true, "Whether to use the cache of uncompressed blocks.", 0) \
M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \

View File

@ -120,6 +120,7 @@ void registerDiskS3(DiskFactory & factory)
cfg.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 10000);
cfg.httpRequestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 5000);
cfg.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
cfg.endpointOverride = uri.endpoint;
auto proxy_config = getProxyConfiguration(config_prefix, config);

View File

@ -106,23 +106,72 @@ namespace
const std::string host;
const UInt16 port;
bool https;
const String proxy_host;
const UInt16 proxy_port;
bool proxy_https;
bool resolve_host;
using Base = PoolBase<Poco::Net::HTTPClientSession>;
ObjectPtr allocObject() override
{
return makeHTTPSessionImpl(host, port, https, true);
auto session = makeHTTPSessionImpl(host, port, https, true, resolve_host);
if (!proxy_host.empty())
{
const String proxy_scheme = proxy_https ? "https" : "http";
session->setProxyHost(proxy_host);
session->setProxyPort(proxy_port);
#if !defined(ARCADIA_BUILD)
session->setProxyProtocol(proxy_scheme);
/// Turn on tunnel mode if proxy scheme is HTTP while endpoint scheme is HTTPS.
session->setProxyTunnel(!proxy_https && https);
#endif
}
return session;
}
public:
SingleEndpointHTTPSessionPool(const std::string & host_, UInt16 port_, bool https_, size_t max_pool_size_)
: Base(max_pool_size_, &Poco::Logger::get("HTTPSessionPool")), host(host_), port(port_), https(https_)
SingleEndpointHTTPSessionPool(
const std::string & host_,
UInt16 port_,
bool https_,
const std::string & proxy_host_,
UInt16 proxy_port_,
bool proxy_https_,
size_t max_pool_size_,
bool resolve_host_ = true)
: Base(max_pool_size_, &Poco::Logger::get("HTTPSessionPool"))
, host(host_)
, port(port_)
, https(https_)
, proxy_host(proxy_host_)
, proxy_port(proxy_port_)
, proxy_https(proxy_https_)
, resolve_host(resolve_host_)
{
}
};
class HTTPSessionPool : private boost::noncopyable
{
public:
struct Key
{
String target_host;
UInt16 target_port;
bool is_target_https;
String proxy_host;
UInt16 proxy_port;
bool is_proxy_https;
bool operator ==(const Key & rhs) const
{
return std::tie(target_host, target_port, is_target_https, proxy_host, proxy_port, is_proxy_https)
== std::tie(rhs.target_host, rhs.target_port, rhs.is_target_https, rhs.proxy_host, rhs.proxy_port, rhs.is_proxy_https);
}
};
private:
using Key = std::tuple<std::string, UInt16, bool>;
using PoolPtr = std::shared_ptr<SingleEndpointHTTPSessionPool>;
using Entry = SingleEndpointHTTPSessionPool::Entry;
@ -131,9 +180,12 @@ namespace
size_t operator()(const Key & k) const
{
SipHash s;
s.update(std::get<0>(k));
s.update(std::get<1>(k));
s.update(std::get<2>(k));
s.update(k.target_host);
s.update(k.target_port);
s.update(k.is_target_https);
s.update(k.proxy_host);
s.update(k.proxy_port);
s.update(k.is_proxy_https);
return s.get64();
}
};
@ -153,18 +205,32 @@ namespace
Entry getSession(
const Poco::URI & uri,
const Poco::URI & proxy_uri,
const ConnectionTimeouts & timeouts,
size_t max_connections_per_endpoint)
size_t max_connections_per_endpoint,
bool resolve_host = true)
{
std::unique_lock lock(mutex);
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = isHTTPS(uri);
auto key = std::make_tuple(host, port, https);
String proxy_host;
UInt16 proxy_port = 0;
bool proxy_https = false;
if (!proxy_uri.empty())
{
proxy_host = proxy_uri.getHost();
proxy_port = proxy_uri.getPort();
proxy_https = isHTTPS(proxy_uri);
}
HTTPSessionPool::Key key{host, port, https, proxy_host, proxy_port, proxy_https};
auto pool_ptr = endpoints_pool.find(key);
if (pool_ptr == endpoints_pool.end())
std::tie(pool_ptr, std::ignore) = endpoints_pool.emplace(
key, std::make_shared<SingleEndpointHTTPSessionPool>(host, port, https, max_connections_per_endpoint));
key, std::make_shared<SingleEndpointHTTPSessionPool>(host, port, https, proxy_host, proxy_port, proxy_https, max_connections_per_endpoint, resolve_host));
auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
auto session = pool_ptr->second->get(retry_timeout);
@ -178,13 +244,17 @@ namespace
if (!msg.empty())
{
LOG_TRACE((&Poco::Logger::get("HTTPCommon")), "Failed communicating with {} with error '{}' will try to reconnect session", host, msg);
/// Host can change IP
const auto ip = DNSResolver::instance().resolveHost(host).toString();
if (ip != session->getHost())
if (resolve_host)
{
session->reset();
session->setHost(ip);
session->attachSessionData({});
/// Host can change IP
const auto ip = DNSResolver::instance().resolveHost(host).toString();
if (ip != session->getHost())
{
session->reset();
session->setHost(ip);
session->attachSessionData({});
}
}
}
}
@ -218,9 +288,14 @@ HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts &
}
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size)
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host)
{
return HTTPSessionPool::instance().getSession(uri, timeouts, per_endpoint_pool_size);
return makePooledHTTPSession(uri, {}, timeouts, per_endpoint_pool_size, resolve_host);
}
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Poco::URI & proxy_uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host)
{
return HTTPSessionPool::instance().getSession(uri, proxy_uri, timeouts, per_endpoint_pool_size, resolve_host);
}
bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status) { return status == Poco::Net::HTTPResponse::HTTP_MOVED_PERMANENTLY || status == Poco::Net::HTTPResponse::HTTP_FOUND || status == Poco::Net::HTTPResponse::HTTP_SEE_OTHER || status == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT; }

View File

@ -50,8 +50,9 @@ void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigne
/// Create session object to perform requests and set required parameters.
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host = true);
/// As previous method creates session, but tooks it from pool
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size);
/// As previous method creates session, but tooks it from pool, without and with proxy uri.
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host = true);
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Poco::URI & proxy_uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host = true);
bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status);

View File

@ -6,7 +6,7 @@
#include <utility>
#include <IO/HTTPCommon.h>
#include <IO/S3/PocoHTTPResponseStream.h>
#include <IO/S3/SessionAwareAwsStream.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Common/Stopwatch.h>
@ -86,6 +86,7 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & clientConfigu
))
, remote_host_filter(clientConfiguration.remote_host_filter)
, s3_max_redirects(clientConfiguration.s3_max_redirects)
, max_connections(clientConfiguration.maxConnections)
{
}
@ -164,28 +165,24 @@ void PocoHTTPClient::makeRequestInternal(
{
for (unsigned int attempt = 0; attempt <= s3_max_redirects; ++attempt)
{
Poco::URI poco_uri(uri);
/// Reverse proxy can replace host header with resolved ip address instead of host name.
/// This can lead to request signature difference on S3 side.
auto session = makeHTTPSession(poco_uri, timeouts, false);
Poco::URI target_uri(uri);
Poco::URI proxy_uri;
auto request_configuration = per_request_configuration(request);
if (!request_configuration.proxyHost.empty())
{
/// Turn on tunnel mode if proxy scheme is HTTP while endpoint scheme is HTTPS.
bool use_tunnel = request_configuration.proxyScheme == Aws::Http::Scheme::HTTP && poco_uri.getScheme() == "https";
session->setProxy(
request_configuration.proxyHost,
request_configuration.proxyPort,
Aws::Http::SchemeMapper::ToString(request_configuration.proxyScheme),
use_tunnel
);
proxy_uri.setScheme(Aws::Http::SchemeMapper::ToString(request_configuration.proxyScheme));
proxy_uri.setHost(request_configuration.proxyHost);
proxy_uri.setPort(request_configuration.proxyPort);
}
/// Reverse proxy can replace host header with resolved ip address instead of host name.
/// This can lead to request signature difference on S3 side.
auto session = makePooledHTTPSession(target_uri, proxy_uri, timeouts, max_connections, false);
Poco::Net::HTTPRequest poco_request(Poco::Net::HTTPRequest::HTTP_1_1);
poco_request.setURI(poco_uri.getPathAndQuery());
poco_request.setURI(target_uri.getPathAndQuery());
switch (request.GetMethod())
{
@ -281,7 +278,7 @@ void PocoHTTPClient::makeRequestInternal(
}
}
else
response->GetResponseStream().SetUnderlyingStream(std::make_shared<PocoHTTPResponseStream>(session, response_body_stream));
response->GetResponseStream().SetUnderlyingStream(std::make_shared<SessionAwareAwsStream<decltype(session)>>(session, response_body_stream));
return;
}
@ -297,6 +294,7 @@ void PocoHTTPClient::makeRequestInternal(
ProfileEvents::increment(select_metric(S3MetricType::Errors));
}
}
}
#endif

View File

@ -56,6 +56,7 @@ private:
ConnectionTimeouts timeouts;
const RemoteHostFilter & remote_host_filter;
unsigned int s3_max_redirects;
unsigned int max_connections;
};
}

View File

@ -1,19 +0,0 @@
#include <Common/config.h>
#if USE_AWS_S3
#include "PocoHTTPResponseStream.h"
#include <utility>
namespace DB::S3
{
PocoHTTPResponseStream::PocoHTTPResponseStream(std::shared_ptr<Poco::Net::HTTPClientSession> session_, std::istream & response_stream_)
: Aws::IOStream(response_stream_.rdbuf()), session(std::move(session_))
{
}
}
#endif

View File

@ -1,21 +0,0 @@
#pragma once
#include <aws/core/utils/stream/ResponseStream.h>
#include <Poco/Net/HTTPClientSession.h>
namespace DB::S3
{
/**
* Wrapper of IStream to store response stream and corresponding HTTP session.
*/
class PocoHTTPResponseStream : public Aws::IOStream
{
public:
PocoHTTPResponseStream(std::shared_ptr<Poco::Net::HTTPClientSession> session_, std::istream & response_stream_);
private:
/// Poco HTTP session is holder of response stream.
std::shared_ptr<Poco::Net::HTTPClientSession> session;
};
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <IO/HTTPCommon.h>
#include <aws/core/utils/stream/ResponseStream.h>
namespace DB::S3
{
/**
* Wrapper of IOStream to store response stream and corresponding HTTP session.
*/
template <typename Session>
class SessionAwareAwsStream : public Aws::IStream
{
public:
SessionAwareAwsStream(Session session_, std::istream & response_stream_)
: Aws::IStream(response_stream_.rdbuf()), session(std::move(session_))
{
}
private:
/// Poco HTTP session is holder of response stream.
Session session;
};
}

View File

@ -280,7 +280,7 @@ namespace S3
}
std::shared_ptr<Aws::S3::S3Client> ClientFactory::create( // NOLINT
Aws::Client::ClientConfiguration & cfg,
const Aws::Client::ClientConfiguration & cfg,
bool is_virtual_hosted_style,
const String & access_key_id,
const String & secret_access_key,
@ -306,7 +306,7 @@ namespace S3
}
std::shared_ptr<Aws::S3::S3Client> ClientFactory::create( // NOLINT
const String & endpoint,
const Aws::Client::ClientConfiguration & cfg,
bool is_virtual_hosted_style,
const String & access_key_id,
const String & secret_access_key,
@ -315,10 +315,7 @@ namespace S3
const RemoteHostFilter & remote_host_filter,
unsigned int s3_max_redirects)
{
PocoHTTPClientConfiguration client_configuration({}, remote_host_filter, s3_max_redirects);
if (!endpoint.empty())
client_configuration.endpointOverride = endpoint;
PocoHTTPClientConfiguration client_configuration(cfg, remote_host_filter, s3_max_redirects);
client_configuration.updateSchemeAndRegion();

View File

@ -41,7 +41,7 @@ public:
unsigned int s3_max_redirects);
std::shared_ptr<Aws::S3::S3Client> create(
Aws::Client::ClientConfiguration & cfg,
const Aws::Client::ClientConfiguration & cfg,
bool is_virtual_hosted_style,
const String & access_key_id,
const String & secret_access_key,
@ -50,7 +50,7 @@ public:
unsigned int s3_max_redirects);
std::shared_ptr<Aws::S3::S3Client> create(
const String & endpoint,
const Aws::Client::ClientConfiguration & cfg,
bool is_virtual_hosted_style,
const String & access_key_id,
const String & secret_access_key,

View File

@ -196,6 +196,7 @@ StorageS3::StorageS3(
const String & format_name_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
UInt64 max_connections_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_,
@ -220,8 +221,12 @@ StorageS3::StorageS3(
if (access_key_id_.empty())
credentials = Aws::Auth::AWSCredentials(std::move(settings.access_key_id), std::move(settings.secret_access_key));
Aws::Client::ClientConfiguration client_configuration;
client_configuration.endpointOverride = uri_.endpoint;
client_configuration.maxConnections = max_connections_;
client = S3::ClientFactory::instance().create(
uri_.endpoint,
client_configuration,
uri_.is_virtual_hosted_style,
credentials.GetAWSAccessKeyId(),
credentials.GetAWSSecretKey(),
@ -374,6 +379,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
UInt64 min_upload_part_size = args.local_context.getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = args.local_context.getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = args.local_context.getSettingsRef().s3_max_connections;
String compression_method;
String format_name;
@ -396,6 +402,7 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
format_name,
min_upload_part_size,
max_single_part_upload_size,
max_connections,
args.columns,
args.constraints,
args.context,

View File

@ -32,6 +32,7 @@ public:
const String & format_name_,
UInt64 min_upload_part_size_,
UInt64 max_single_part_upload_size_,
UInt64 max_connections_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_,

View File

@ -68,6 +68,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const C
S3::URI s3_uri (uri);
UInt64 min_upload_part_size = context.getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = context.getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = context.getSettingsRef().s3_max_connections;
StoragePtr storage = StorageS3::create(
s3_uri,
@ -77,6 +78,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, const C
format,
min_upload_part_size,
max_single_part_upload_size,
max_connections,
getActualTableStructure(context),
ConstraintsDescription{},
const_cast<Context &>(context),