This commit is contained in:
Aleksei Filatov 2023-07-14 12:09:22 +03:00 committed by Aleksei Filatov
parent 801d0955ec
commit 08defa36b2
5 changed files with 89 additions and 61 deletions

View File

@ -306,7 +306,7 @@ namespace Net
DEFAULT_KEEP_ALIVE_TIMEOUT = 8
};
void reconnect();
virtual void reconnect();
/// Connects the underlying socket to the HTTP server.
int write(const char * buffer, std::streamsize length);

View File

@ -60,7 +60,7 @@ ClientConfigurationPerRequest ProxyResolverConfiguration::getConfiguration(const
{
auto resolved_endpoint = endpoint;
resolved_endpoint.setHost(resolved_hosts[i].toString());
session = makeHTTPSession(resolved_endpoint, timeouts, false);
session = makeHTTPSession(resolved_endpoint, timeouts);
try
{

View File

@ -2,6 +2,7 @@
#include <Server/HTTP/HTTPServerResponse.h>
#include <Poco/Any.h>
#include <Common/Concepts.h>
#include <Common/DNSResolver.h>
#include <Common/Exception.h>
#include <Common/MemoryTrackerSwitcher.h>
@ -24,9 +25,9 @@
#include <Poco/Util/Application.h>
#include <sstream>
#include <tuple>
#include <unordered_map>
#include <sstream>
namespace ProfileEvents
@ -54,6 +55,78 @@ namespace
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
}
template <typename Session>
requires std::derived_from<Session, Poco::Net::HTTPClientSession>
class HTTPSessionAdapter : public Session
{
static_assert(std::has_virtual_destructor_v<Session>, "The base class must have a virtual destructor");
public:
HTTPSessionAdapter(const std::string & host, UInt16 port) : Session(host, port), log{&Poco::Logger::get("HTTPSessionAdapter")} { }
~HTTPSessionAdapter() override = default;
protected:
void reconnect() override
{
// First of all will try to establish connection with last used addr.
if (!Session::getResolvedHost().empty())
{
try
{
Session::reconnect();
return;
}
catch (...)
{
Session::close();
LOG_TRACE(
log,
"Last ip ({}) is unreachable for {}:{}. Will try another resolved address.",
Session::getResolvedHost(),
Session::getHost(),
Session::getPort());
}
}
const auto endpoinds = DNSResolver::instance().resolveHostAll(Session::getHost());
for (auto it = endpoinds.begin();;)
{
try
{
Session::setResolvedHost(it->toString());
Session::reconnect();
LOG_TRACE(
log,
"Created HTTP(S) session with {}:{} ({}:{})",
Session::getHost(),
Session::getPort(),
it->toString(),
Session::getPort());
break;
}
catch (...)
{
Session::close();
if (++it == endpoinds.end())
{
Session::setResolvedHost("");
throw;
}
LOG_TRACE(
log,
"Failed to create connection with {}:{}, Will try another resolved address. {}",
Session::getResolvedHost(),
Session::getPort(),
getCurrentExceptionMessage(false));
}
}
}
Poco::Logger * log;
};
bool isHTTPS(const Poco::URI & uri)
{
if (uri.getScheme() == "https")
@ -64,28 +137,21 @@ namespace
throw Exception(ErrorCodes::UNSUPPORTED_URI_SCHEME, "Unsupported scheme in URI '{}'", uri.toString());
}
HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive, bool resolve_host = true)
HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive)
{
HTTPSessionPtr session;
if (https)
{
#if USE_SSL
/// Cannot resolve host in advance, otherwise SNI won't work in Poco.
/// For more information about SNI, see the https://en.wikipedia.org/wiki/Server_Name_Indication
auto https_session = std::make_shared<Poco::Net::HTTPSClientSession>(host, port);
if (resolve_host)
https_session->setResolvedHost(DNSResolver::instance().resolveHost(host).toString());
session = std::move(https_session);
session = std::make_shared<HTTPSessionAdapter<Poco::Net::HTTPSClientSession>>(host, port);
#else
throw Exception(ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME, "ClickHouse was built without HTTPS support");
#endif
}
else
{
String resolved_host = resolve_host ? DNSResolver::instance().resolveHost(host).toString() : host;
session = std::make_shared<Poco::Net::HTTPClientSession>(resolved_host, port);
session = std::make_shared<HTTPSessionAdapter<Poco::Net::HTTPClientSession>>(host, port);
}
ProfileEvents::increment(ProfileEvents::CreatedHTTPConnections);
@ -104,7 +170,6 @@ namespace
const String proxy_host;
const UInt16 proxy_port;
const bool proxy_https;
const bool resolve_host;
using Base = PoolBase<Poco::Net::HTTPClientSession>;
@ -113,7 +178,7 @@ namespace
/// Pool is global, we shouldn't attribute this memory to query/user.
MemoryTrackerSwitcher switcher{&total_memory_tracker};
auto session = makeHTTPSessionImpl(host, port, https, true, resolve_host);
auto session = makeHTTPSessionImpl(host, port, https, true);
if (!proxy_host.empty())
{
const String proxy_scheme = proxy_https ? "https" : "http";
@ -137,7 +202,6 @@ namespace
UInt16 proxy_port_,
bool proxy_https_,
size_t max_pool_size_,
bool resolve_host_,
bool wait_on_pool_size_limit)
: Base(
static_cast<unsigned>(max_pool_size_),
@ -149,7 +213,6 @@ namespace
, proxy_host(proxy_host_)
, proxy_port(proxy_port_)
, proxy_https(proxy_https_)
, resolve_host(resolve_host_)
{
}
};
@ -197,24 +260,6 @@ namespace
std::mutex mutex;
std::unordered_map<Key, PoolPtr, Hasher> endpoints_pool;
void updateHostIfIpChanged(Entry & session, const String & new_ip)
{
const auto old_ip = session->getResolvedHost().empty() ? session->getHost() : session->getResolvedHost();
if (new_ip != old_ip)
{
session->reset();
if (session->getResolvedHost().empty())
{
session->setHost(new_ip);
}
else
{
session->setResolvedHost(new_ip);
}
}
}
protected:
HTTPSessionPool() = default;
@ -230,7 +275,6 @@ namespace
const Poco::URI & proxy_uri,
const ConnectionTimeouts & timeouts,
size_t max_connections_per_endpoint,
bool resolve_host,
bool wait_on_pool_size_limit)
{
std::unique_lock lock(mutex);
@ -261,7 +305,6 @@ namespace
proxy_port,
proxy_https,
max_connections_per_endpoint,
resolve_host,
wait_on_pool_size_limit));
/// Some routines held session objects until the end of its lifetime. Also this routines may create another sessions in this time frame.
@ -273,17 +316,6 @@ namespace
auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
auto session = pool_ptr->second->get(retry_timeout);
const auto & session_data = session->sessionData();
if (session_data.empty() || !Poco::AnyCast<HTTPSessionReuseTag>(&session_data))
{
session->reset();
if (resolve_host)
updateHostIfIpChanged(session, DNSResolver::instance().resolveHost(host).toString());
}
session->attachSessionData({});
setTimeouts(*session, timeouts);
return session;
@ -301,13 +333,13 @@ void setResponseDefaultHeaders(HTTPServerResponse & response, size_t keep_alive_
response.set("Keep-Alive", "timeout=" + std::to_string(timeout.totalSeconds()));
}
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host)
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts)
{
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = isHTTPS(uri);
auto session = makeHTTPSessionImpl(host, port, https, false, resolve_host);
auto session = makeHTTPSessionImpl(host, port, https, false);
setTimeouts(*session, timeouts);
return session;
}
@ -317,10 +349,9 @@ PooledHTTPSessionPtr makePooledHTTPSession(
const Poco::URI & uri,
const ConnectionTimeouts & timeouts,
size_t per_endpoint_pool_size,
bool resolve_host,
bool wait_on_pool_size_limit)
{
return makePooledHTTPSession(uri, {}, timeouts, per_endpoint_pool_size, resolve_host, wait_on_pool_size_limit);
return makePooledHTTPSession(uri, {}, timeouts, per_endpoint_pool_size, wait_on_pool_size_limit);
}
PooledHTTPSessionPtr makePooledHTTPSession(
@ -328,10 +359,9 @@ PooledHTTPSessionPtr makePooledHTTPSession(
const Poco::URI & proxy_uri,
const ConnectionTimeouts & timeouts,
size_t per_endpoint_pool_size,
bool resolve_host,
bool wait_on_pool_size_limit)
{
return HTTPSessionPool::instance().getSession(uri, proxy_uri, timeouts, per_endpoint_pool_size, resolve_host, wait_on_pool_size_limit);
return HTTPSessionPool::instance().getSession(uri, proxy_uri, timeouts, per_endpoint_pool_size, wait_on_pool_size_limit);
}
bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status) { return status == Poco::Net::HTTPResponse::HTTP_MOVED_PERMANENTLY || status == Poco::Net::HTTPResponse::HTTP_FOUND || status == Poco::Net::HTTPResponse::HTTP_SEE_OTHER || status == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT; }

View File

@ -70,14 +70,13 @@ void markSessionForReuse(PooledHTTPSessionPtr session);
void setResponseDefaultHeaders(HTTPServerResponse & response, size_t keep_alive_timeout);
/// Create session object to perform requests and set required parameters.
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host = true);
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts);
/// As previous method creates session, but tooks it from pool, without and with proxy uri.
PooledHTTPSessionPtr makePooledHTTPSession(
const Poco::URI & uri,
const ConnectionTimeouts & timeouts,
size_t per_endpoint_pool_size,
bool resolve_host = true,
bool wait_on_pool_size_limit = true);
PooledHTTPSessionPtr makePooledHTTPSession(
@ -85,7 +84,6 @@ PooledHTTPSessionPtr makePooledHTTPSession(
const Poco::URI & proxy_uri,
const ConnectionTimeouts & timeouts,
size_t per_endpoint_pool_size,
bool resolve_host = true,
bool wait_on_pool_size_limit = true);
bool isRedirect(Poco::Net::HTTPResponse::HTTPStatus status);

View File

@ -336,9 +336,9 @@ void PocoHTTPClient::makeRequestInternalImpl(
/// This can lead to request signature difference on S3 side.
if constexpr (pooled)
session = makePooledHTTPSession(
target_uri, timeouts, http_connection_pool_size, /* resolve_host = */ true, wait_on_pool_size_limit);
target_uri, timeouts, http_connection_pool_size, wait_on_pool_size_limit);
else
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ false);
session = makeHTTPSession(target_uri, timeouts);
bool use_tunnel = request_configuration.proxy_scheme == Aws::Http::Scheme::HTTP && target_uri.getScheme() == "https";
session->setProxy(
@ -352,9 +352,9 @@ void PocoHTTPClient::makeRequestInternalImpl(
{
if constexpr (pooled)
session = makePooledHTTPSession(
target_uri, timeouts, http_connection_pool_size, /* resolve_host = */ true, wait_on_pool_size_limit);
target_uri, timeouts, http_connection_pool_size, wait_on_pool_size_limit);
else
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ false);
session = makeHTTPSession(target_uri, timeouts);
}
/// In case of error this address will be written to logs