ClickHouse/src/IO/HTTPCommon.cpp

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

342 lines
12 KiB
C++
Raw Normal View History

#include <IO/HTTPCommon.h>
#include <Server/HTTP/HTTPServerResponse.h>
#include <Common/DNSResolver.h>
#include <Common/Exception.h>
2019-02-10 17:40:52 +00:00
#include <Common/PoolBase.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
#include "config.h"
#if USE_SSL
# include <Poco/Net/AcceptCertificateHandler.h>
# include <Poco/Net/Context.h>
# include <Poco/Net/HTTPSClientSession.h>
# include <Poco/Net/InvalidCertificateHandler.h>
# include <Poco/Net/PrivateKeyPassphraseHandler.h>
# include <Poco/Net/RejectCertificateHandler.h>
# include <Poco/Net/SSLManager.h>
# include <Poco/Net/SecureStreamSocket.h>
#endif
2018-11-18 22:15:42 +00:00
#include <Poco/Util/Application.h>
2017-01-30 05:13:58 +00:00
2019-02-10 17:40:52 +00:00
#include <tuple>
#include <unordered_map>
2018-06-15 07:42:57 +00:00
#include <sstream>
2018-11-18 22:15:42 +00:00
namespace ProfileEvents
{
extern const Event CreatedHTTPConnections;
}
namespace DB
{
namespace ErrorCodes
{
extern const int RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
2018-06-16 05:54:06 +00:00
extern const int FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME;
extern const int UNSUPPORTED_URI_SCHEME;
}
2018-06-16 05:54:06 +00:00
namespace
{
2019-05-31 15:50:21 +00:00
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
{
session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
2019-03-29 18:10:03 +00:00
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
}
bool isHTTPS(const Poco::URI & uri)
{
if (uri.getScheme() == "https")
return true;
else if (uri.getScheme() == "http")
return false;
else
throw Exception(ErrorCodes::UNSUPPORTED_URI_SCHEME, "Unsupported scheme in URI '{}'", uri.toString());
}
HTTPSessionPtr makeHTTPSessionImpl(const std::string & host, UInt16 port, bool https, bool keep_alive, bool resolve_host = true)
{
HTTPSessionPtr session;
if (https)
{
#if USE_SSL
String resolved_host = resolve_host ? DNSResolver::instance().resolveHost(host).toString() : host;
auto https_session = std::make_shared<Poco::Net::HTTPSClientSession>(host, port);
if (resolve_host)
https_session->setResolvedHost(DNSResolver::instance().resolveHost(host).toString());
session = std::move(https_session);
#else
throw Exception(ErrorCodes::FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME, "ClickHouse was built without HTTPS support");
#endif
}
else
{
String resolved_host = resolve_host ? DNSResolver::instance().resolveHost(host).toString() : host;
session = std::make_shared<Poco::Net::HTTPClientSession>(resolved_host, port);
}
ProfileEvents::increment(ProfileEvents::CreatedHTTPConnections);
/// doesn't work properly without patch
session->setKeepAlive(keep_alive);
return session;
}
class SingleEndpointHTTPSessionPool : public PoolBase<Poco::Net::HTTPClientSession>
{
private:
const std::string host;
const UInt16 port;
2023-04-14 10:18:20 +00:00
const bool https;
const String proxy_host;
const UInt16 proxy_port;
2023-04-14 10:18:20 +00:00
const bool proxy_https;
const bool resolve_host;
using Base = PoolBase<Poco::Net::HTTPClientSession>;
2023-04-14 10:18:20 +00:00
ObjectPtr allocObject() override
{
auto session = makeHTTPSessionImpl(host, port, https, true, resolve_host);
if (!proxy_host.empty())
{
const String proxy_scheme = proxy_https ? "https" : "http";
session->setProxyHost(proxy_host);
session->setProxyPort(proxy_port);
session->setProxyProtocol(proxy_scheme);
/// Turn on tunnel mode if proxy scheme is HTTP while endpoint scheme is HTTPS.
session->setProxyTunnel(!proxy_https && https);
}
return session;
}
public:
SingleEndpointHTTPSessionPool(
2023-04-14 10:18:20 +00:00
const std::string & host_,
UInt16 port_,
bool https_,
const std::string & proxy_host_,
UInt16 proxy_port_,
bool proxy_https_,
size_t max_pool_size_,
bool resolve_host_ = true)
: Base(static_cast<unsigned>(max_pool_size_), &Poco::Logger::get("HTTPSessionPool"))
, host(host_)
, port(port_)
, https(https_)
, proxy_host(proxy_host_)
, proxy_port(proxy_port_)
, proxy_https(proxy_https_)
, resolve_host(resolve_host_)
{
}
};
class HTTPSessionPool : private boost::noncopyable
{
public:
struct Key
{
String target_host;
UInt16 target_port;
bool is_target_https;
String proxy_host;
UInt16 proxy_port;
bool is_proxy_https;
2020-12-21 07:48:26 +00:00
bool operator ==(const Key & rhs) const
{
return std::tie(target_host, target_port, is_target_https, proxy_host, proxy_port, is_proxy_https)
== std::tie(rhs.target_host, rhs.target_port, rhs.is_target_https, rhs.proxy_host, rhs.proxy_port, rhs.is_proxy_https);
}
};
private:
using PoolPtr = std::shared_ptr<SingleEndpointHTTPSessionPool>;
using Entry = SingleEndpointHTTPSessionPool::Entry;
struct Hasher
{
size_t operator()(const Key & k) const
{
SipHash s;
s.update(k.target_host);
s.update(k.target_port);
s.update(k.is_target_https);
s.update(k.proxy_host);
s.update(k.proxy_port);
s.update(k.is_proxy_https);
return s.get64();
}
};
std::mutex mutex;
std::unordered_map<Key, PoolPtr, Hasher> endpoints_pool;
protected:
HTTPSessionPool() = default;
public:
static auto & instance()
{
static HTTPSessionPool instance;
return instance;
}
2019-03-29 18:10:03 +00:00
Entry getSession(
const Poco::URI & uri,
const Poco::URI & proxy_uri,
2019-03-29 18:10:03 +00:00
const ConnectionTimeouts & timeouts,
size_t max_connections_per_endpoint,
bool resolve_host = true)
{
std::lock_guard lock(mutex);
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = isHTTPS(uri);
String proxy_host;
UInt16 proxy_port = 0;
bool proxy_https = false;
if (!proxy_uri.empty())
{
proxy_host = proxy_uri.getHost();
proxy_port = proxy_uri.getPort();
proxy_https = isHTTPS(proxy_uri);
}
HTTPSessionPool::Key key{host, port, https, proxy_host, proxy_port, proxy_https};
auto pool_ptr = endpoints_pool.find(key);
if (pool_ptr == endpoints_pool.end())
std::tie(pool_ptr, std::ignore) = endpoints_pool.emplace(
key, std::make_shared<SingleEndpointHTTPSessionPool>(host, port, https, proxy_host, proxy_port, proxy_https, max_connections_per_endpoint, resolve_host));
auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
auto session = pool_ptr->second->get(retry_timeout);
/// We store exception messages in session data.
/// Poco HTTPSession also stores exception, but it can be removed at any time.
const auto & session_data = session->sessionData();
if (!session_data.empty())
{
auto msg = Poco::AnyCast<std::string>(session_data);
if (!msg.empty())
{
2020-05-30 21:57:37 +00:00
LOG_TRACE((&Poco::Logger::get("HTTPCommon")), "Failed communicating with {} with error '{}' will try to reconnect session", host, msg);
if (resolve_host)
{
/// Host can change IP
const auto ip = DNSResolver::instance().resolveHost(host).toString();
if (ip != session->getHost())
{
session->reset();
session->setHost(ip);
}
}
}
Do not report "Failed communicating with" on and on for parts exchange For parts exchange right now an error message in stored in the HTTPSession, via HTTPSession::attachSessionData(), and for each request this HTTPSession::sessionData() is checked and if not empty printed into the log. However it should be reported only once, otherwise you will get this message on and on, even for different tables. Here is an example of such messages: 2022.07.13 07:56:09.342997 [ 683 ] {} <Error> test_juq8qk.rmt2 (ede90518-4710-48bb-ab43-caf0b1b157f4): auto DB::StorageReplicatedMergeTree::processQueueEntry(ReplicatedMergeTreeQueue::SelectedEntryPtr)::(anonymous class)::operator()(DB::StorageReplicatedMergeTree::LogEntryPtr &) const: Code: 86. DB::Exception: Received error from remote server /?endpoint=DataPartsExchange%3A%2Ftest%2F01165%2Ftest_juq8qk%2Frmt%2Freplicas%2F1&part=206--1406300905-20220713-1657673769_0_0_0&client_protocol_version=7&compress=false&remote_fs_metadata=s3. HTTP status code: 500 Internal Server Error, body: Code: 236. DB::Exception: Transferring part to replica was cancelled. (ABORTED) (version 22.7.1.1781 (official build)). (RECEIVED_ERROR_FROM_REMOTE_IO_SERVER), Stack trace (when copying this message, always include the lines below): # this is the time when this message is written ^ 2022.07.13 07:56:10.528554 [ 814 ] {} <Information> DatabaseCatalog: Removing metadata /var/lib/clickhouse/metadata_dropped/test_juq8qk.rmt2.ede90518-4710-48bb-ab43-caf0b1b157f4.sql of dropped table test_juq8qk.rmt2 (ede90518-4710-48bb-ab43-caf0b1b157f4) # now this table had been removed ^ 2022.07.13 07:56:27.442003 [ 683 ] {} <Debug> test_1orbeb.mutations_and_quorum2 (bc811afd-1f57-4f9c-a04d-9e1e1b60e891): Fetching part 201901_0_0_0 from /clickhouse/tables/test_1orbeb/test_01090/mutations_and_quorum/replicas/1 # here fetch part is scheduled for another table ^ 2022.07.13 07:56:27.442213 [ 683 ] {} <Trace> HTTPCommon: Failed communicating with 250c6af4615d with error 'Received error from remote server /?endpoint=DataPartsExchange%3A%2Ftest%2F01165%2Ftest_juq8qk%2Frmt%2Freplicas%2F1&part=255--1372061156-20220713-1657673769_0_0_0&client_protocol_version=7&compress=false&remote_fs_metadata=s3. HTTP status code: 500 Internal Server Error, body: Code: 236. DB::Exception: Transferring part to replica was cancelled. (ABORTED) (version 22.7.1.1781 (official build))' will try to reconnect session # however it still reports an error for the already removed table test_juq8qk.rmt 2022.07.13 07:56:27.442246 [ 683 ] {} <Trace> ReadWriteBufferFromHTTP: Sending request to http://250c6af4615d:9009/?endpoint=DataPartsExchange%3A%2Fclickhouse%2Ftables%2Ftest_1orbeb% # but this is just a noisy message and it still doing the job correctly ^ Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
2022-07-14 12:10:14 +00:00
/// Reset the message, once it has been printed,
/// otherwise you will get report for failed parts on and on,
/// even for different tables (since they uses the same session).
session->attachSessionData({});
}
setTimeouts(*session, timeouts);
2019-03-29 18:10:03 +00:00
return session;
}
};
}
void setResponseDefaultHeaders(HTTPServerResponse & response, size_t keep_alive_timeout)
2016-12-31 02:05:37 +00:00
{
if (!response.getKeepAlive())
return;
Poco::Timespan timeout(keep_alive_timeout, 0);
if (timeout.totalSeconds())
response.set("Keep-Alive", "timeout=" + std::to_string(timeout.totalSeconds()));
}
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host)
{
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = isHTTPS(uri);
auto session = makeHTTPSessionImpl(host, port, https, false, resolve_host);
setTimeouts(*session, timeouts);
return session;
}
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host)
{
return makePooledHTTPSession(uri, {}, timeouts, per_endpoint_pool_size, resolve_host);
}
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Poco::URI & proxy_uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host)
{
return HTTPSessionPool::instance().getSession(uri, proxy_uri, timeouts, per_endpoint_pool_size, resolve_host);
}
bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status) { return status == Poco::Net::HTTPResponse::HTTP_MOVED_PERMANENTLY || status == Poco::Net::HTTPResponse::HTTP_FOUND || status == Poco::Net::HTTPResponse::HTTP_SEE_OTHER || status == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT; }
2018-06-16 05:54:06 +00:00
std::istream * receiveResponse(
Poco::Net::HTTPClientSession & session, const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, const bool allow_redirects)
{
2019-06-17 17:32:57 +00:00
auto & istr = session.receiveResponse(response);
2019-09-23 08:53:09 +00:00
assertResponseIsOk(request, response, istr, allow_redirects);
2019-06-17 17:32:57 +00:00
return &istr;
2019-05-31 10:58:43 +00:00
}
2019-09-23 08:53:09 +00:00
void assertResponseIsOk(const Poco::Net::HTTPRequest & request, Poco::Net::HTTPResponse & response, std::istream & istr, const bool allow_redirects)
2019-06-17 18:06:28 +00:00
{
auto status = response.getStatus();
2020-10-29 21:29:10 +00:00
if (!(status == Poco::Net::HTTPResponse::HTTP_OK
|| status == Poco::Net::HTTPResponse::HTTP_CREATED
|| status == Poco::Net::HTTPResponse::HTTP_ACCEPTED
2021-06-19 16:36:39 +00:00
|| status == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT /// Reading with Range header was successful.
2020-10-29 21:29:10 +00:00
|| (isRedirect(status) && allow_redirects)))
{
int code = status == Poco::Net::HTTPResponse::HTTP_TOO_MANY_REQUESTS
? ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS
: ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER;
std::stringstream body; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
body.exceptions(std::ios::failbit);
body << istr.rdbuf();
throw HTTPException(code, request.getURI(), status, response.getReason(), body.str());
}
}
2018-06-16 05:54:06 +00:00
2023-01-17 16:39:07 +00:00
Exception HTTPException::makeExceptionMessage(
int code,
const std::string & uri,
Poco::Net::HTTPResponse::HTTPStatus http_status,
const std::string & reason,
const std::string & body)
{
2023-01-17 16:39:07 +00:00
return Exception(code,
"Received error from remote server {}. "
"HTTP status code: {} {}, "
"body: {}",
2022-12-04 01:23:43 +00:00
uri, static_cast<int>(http_status), reason, body);
}
}