ClickHouse/src/IO/HTTPCommon.cpp

257 lines
8.8 KiB
C++
Raw Normal View History

#include <IO/HTTPCommon.h>
#include <Common/DNSResolver.h>
#include <Common/Exception.h>
2019-02-10 17:40:52 +00:00
#include <Common/PoolBase.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include <Poco/Version.h>
2018-11-18 22:15:42 +00:00
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>
#endif
#if USE_SSL
# include <Poco/Net/AcceptCertificateHandler.h>
# include <Poco/Net/Context.h>
# include <Poco/Net/HTTPSClientSession.h>
# include <Poco/Net/InvalidCertificateHandler.h>
# include <Poco/Net/PrivateKeyPassphraseHandler.h>
# include <Poco/Net/RejectCertificateHandler.h>
# include <Poco/Net/SSLManager.h>
# include <Poco/Net/SecureStreamSocket.h>
#endif
2018-11-18 22:15:42 +00:00
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/Application.h>
2017-01-30 05:13:58 +00:00
2019-02-10 17:40:52 +00:00
#include <tuple>
#include <unordered_map>
2018-06-15 07:42:57 +00:00
#include <sstream>
2018-11-18 22:15:42 +00:00
namespace ProfileEvents
{
extern const Event CreatedHTTPConnections;
}
namespace DB
{
namespace ErrorCodes
{
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
2018-06-16 05:54:06 +00:00
extern const int FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME;
extern const int UNSUPPORTED_URI_SCHEME;
}
2018-06-16 05:54:06 +00:00
namespace
{
2019-05-31 15:50:21 +00:00
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
{
2019-02-15 11:46:07 +00:00
#if defined(POCO_CLICKHOUSE_PATCH) || POCO_VERSION >= 0x02000000
session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
#else
session.setTimeout(std::max({timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout}));
#endif
2019-03-29 18:10:03 +00:00
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
}
bool isHTTPS(const Poco::URI & uri)
{
if (uri.getScheme() == "https")
return true;
else if (uri.getScheme() == "http")
return false;
else
throw Exception("Unsupported scheme in URI '" + uri.toString() + "'", ErrorCodes::UNSUPPORTED_URI_SCHEME);
}
HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive, bool resolve_host = true)
{
HTTPSessionPtr session;
if (https)
{
#if USE_SSL
/// Cannot resolve host in advance, otherwise SNI won't work in Poco.
session = std::make_shared<Poco::Net::HTTPSClientSession>(host, port);
#else
throw Exception("ClickHouse was built without HTTPS support", ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME);
#endif
}
else
{
String resolved_host = resolve_host ? DNSResolver::instance().resolveHost(host).toString() : host;
session = std::make_shared<Poco::Net::HTTPClientSession>(resolved_host, port);
}
ProfileEvents::increment(ProfileEvents::CreatedHTTPConnections);
/// doesn't work properly without patch
2019-02-15 11:46:07 +00:00
#if defined(POCO_CLICKHOUSE_PATCH)
session->setKeepAlive(keep_alive);
#else
(void)keep_alive; // Avoid warning: unused parameter
#endif
return session;
}
class SingleEndpointHTTPSessionPool : public PoolBase<Poco::Net::HTTPClientSession>
{
private:
const std::string host;
const UInt16 port;
bool https;
using Base = PoolBase<Poco::Net::HTTPClientSession>;
ObjectPtr allocObject() override
{
return makeHTTPSessionImpl(host, port, https, true);
}
public:
SingleEndpointHTTPSessionPool(const std::string & host_, UInt16 port_, bool https_, size_t max_pool_size_)
2018-11-18 22:20:36 +00:00
: Base(max_pool_size_, &Poco::Logger::get("HTTPSessionPool")), host(host_), port(port_), https(https_)
{
}
};
class HTTPSessionPool : private boost::noncopyable
{
private:
using Key = std::tuple<std::string, UInt16, bool>;
using PoolPtr = std::shared_ptr<SingleEndpointHTTPSessionPool>;
using Entry = SingleEndpointHTTPSessionPool::Entry;
struct Hasher
{
size_t operator()(const Key & k) const
{
SipHash s;
s.update(std::get<0>(k));
s.update(std::get<1>(k));
s.update(std::get<2>(k));
return s.get64();
}
};
std::mutex mutex;
std::unordered_map<Key, PoolPtr, Hasher> endpoints_pool;
protected:
HTTPSessionPool() = default;
public:
static auto & instance()
{
static HTTPSessionPool instance;
return instance;
}
2019-03-29 18:10:03 +00:00
Entry getSession(
const Poco::URI & uri,
const ConnectionTimeouts & timeouts,
size_t max_connections_per_endpoint)
{
2019-01-02 06:44:36 +00:00
std::unique_lock lock(mutex);
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = isHTTPS(uri);
auto key = std::make_tuple(host, port, https);
auto pool_ptr = endpoints_pool.find(key);
if (pool_ptr == endpoints_pool.end())
std::tie(pool_ptr, std::ignore) = endpoints_pool.emplace(
key, std::make_shared<SingleEndpointHTTPSessionPool>(host, port, https, max_connections_per_endpoint));
auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
auto session = pool_ptr->second->get(retry_timeout);
/// We store exception messages in session data.
/// Poco HTTPSession also stores exception, but it can be removed at any time.
const auto & session_data = session->sessionData();
if (!session_data.empty())
{
auto msg = Poco::AnyCast<std::string>(session_data);
if (!msg.empty())
{
2020-05-30 21:57:37 +00:00
LOG_TRACE((&Poco::Logger::get("HTTPCommon")), "Failed communicating with {} with error '{}' will try to reconnect session", host, msg);
/// Host can change IP
const auto ip = DNSResolver::instance().resolveHost(host).toString();
if (ip != session->getHost())
{
session->reset();
session->setHost(ip);
session->attachSessionData({});
}
}
}
setTimeouts(*session, timeouts);
2019-03-29 18:10:03 +00:00
return session;
}
};
}
void setResponseDefaultHeaders(Poco::Net::HTTPServerResponse & response, unsigned keep_alive_timeout)
2016-12-31 02:05:37 +00:00
{
if (!response.getKeepAlive())
return;
Poco::Timespan timeout(keep_alive_timeout, 0);
if (timeout.totalSeconds())
response.set("Keep-Alive", "timeout=" + std::to_string(timeout.totalSeconds()));
}
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host)
{
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = isHTTPS(uri);
auto session = makeHTTPSessionImpl(host, port, https, false, resolve_host);
setTimeouts(*session, timeouts);
return session;
}
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size)
{
return HTTPSessionPool::instance().getSession(uri, timeouts, per_endpoint_pool_size);
}
bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status) { return status == Poco::Net::HTTPResponse::HTTP_MOVED_PERMANENTLY || status == Poco::Net::HTTPResponse::HTTP_FOUND || status == Poco::Net::HTTPResponse::HTTP_SEE_OTHER || status == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT; }
2018-06-16 05:54:06 +00:00
std::istream * receiveResponse(
Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, const bool allow_redirects)
{
2019-06-17 17:32:57 +00:00
auto & istr = session.receiveResponse(response);
2019-09-23 08:53:09 +00:00
assertResponseIsOk(request, response, istr, allow_redirects);
2019-06-17 17:32:57 +00:00
return &istr;
2019-05-31 10:58:43 +00:00
}
2019-09-23 08:53:09 +00:00
void assertResponseIsOk(const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, std::istream & istr, const bool allow_redirects)
2019-06-17 18:06:28 +00:00
{
auto status = response.getStatus();
2020-10-29 21:29:10 +00:00
if (!(status == Poco::Net::HTTPResponse::HTTP_OK
|| status == Poco::Net::HTTPResponse::HTTP_CREATED
|| status == Poco::Net::HTTPResponse::HTTP_ACCEPTED
|| (isRedirect(status) && allow_redirects)))
{
2020-11-10 18:22:26 +00:00
std::stringstream error_message; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
2020-11-07 00:14:53 +00:00
error_message.exceptions(std::ios::failbit);
error_message << "Received error from remote server " << request.getURI() << ". HTTP status code: " << status << " "
2019-06-17 17:32:57 +00:00
<< response.getReason() << ", body: " << istr.rdbuf();
throw Exception(error_message.str(),
status == HTTP_TOO_MANY_REQUESTS ? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS
: ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER);
}
}
2018-06-16 05:54:06 +00:00
}