mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 10:31:57 +00:00
Merge pull request #62249 from ClickHouse/chesema-keep-alive-send-and-receive
several fixes for client's keep alive connections
This commit is contained in:
commit
04ea8a85f2
@ -213,6 +213,19 @@ namespace Net
|
||||
Poco::Timespan getKeepAliveTimeout() const;
|
||||
/// Returns the connection timeout for HTTP connections.
|
||||
|
||||
void setKeepAliveMaxRequests(int max_requests);
|
||||
|
||||
int getKeepAliveMaxRequests() const;
|
||||
|
||||
int getKeepAliveRequest() const;
|
||||
|
||||
bool isKeepAliveExpired(double reliability = 1.0) const;
|
||||
/// Returns if the connection is expired with some margin as fraction of timeout as reliability
|
||||
|
||||
double getKeepAliveReliability() const;
|
||||
/// Returns the current fraction of keep alive timeout when connection is considered safe to use
|
||||
/// It helps to avoid situation when a client uses nearly expired connection and receives NoMessageException
|
||||
|
||||
virtual std::ostream & sendRequest(HTTPRequest & request);
|
||||
/// Sends the header for the given HTTP request to
|
||||
/// the server.
|
||||
@ -345,6 +358,8 @@ namespace Net
|
||||
|
||||
void assign(HTTPClientSession & session);
|
||||
|
||||
void setKeepAliveRequest(int request);
|
||||
|
||||
HTTPSessionFactory _proxySessionFactory;
|
||||
/// Factory to create HTTPClientSession to proxy.
|
||||
private:
|
||||
@ -353,6 +368,8 @@ namespace Net
|
||||
Poco::UInt16 _port;
|
||||
ProxyConfig _proxyConfig;
|
||||
Poco::Timespan _keepAliveTimeout;
|
||||
int _keepAliveCurrentRequest = 0;
|
||||
int _keepAliveMaxRequests = 1000;
|
||||
Poco::Timestamp _lastRequest;
|
||||
bool _reconnect;
|
||||
bool _mustReconnect;
|
||||
@ -361,6 +378,7 @@ namespace Net
|
||||
Poco::SharedPtr<std::ostream> _pRequestStream;
|
||||
Poco::SharedPtr<std::istream> _pResponseStream;
|
||||
|
||||
static const double _defaultKeepAliveReliabilityLevel;
|
||||
static ProxyConfig _globalProxyConfig;
|
||||
|
||||
HTTPClientSession(const HTTPClientSession &);
|
||||
@ -450,9 +468,19 @@ namespace Net
|
||||
return _lastRequest;
|
||||
}
|
||||
|
||||
inline void HTTPClientSession::setLastRequest(Poco::Timestamp time)
|
||||
inline double HTTPClientSession::getKeepAliveReliability() const
|
||||
{
|
||||
_lastRequest = time;
|
||||
return _defaultKeepAliveReliabilityLevel;
|
||||
}
|
||||
|
||||
inline int HTTPClientSession::getKeepAliveMaxRequests() const
|
||||
{
|
||||
return _keepAliveMaxRequests;
|
||||
}
|
||||
|
||||
inline int HTTPClientSession::getKeepAliveRequest() const
|
||||
{
|
||||
return _keepAliveCurrentRequest;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -120,6 +120,10 @@ namespace Net
|
||||
/// The value is set to "Keep-Alive" if keepAlive is
|
||||
/// true, or to "Close" otherwise.
|
||||
|
||||
void setKeepAliveTimeout(int timeout, int max_requests);
|
||||
int getKeepAliveTimeout() const;
|
||||
int getKeepAliveMaxRequests() const;
|
||||
|
||||
bool getKeepAlive() const;
|
||||
/// Returns true if
|
||||
/// * the message has a Connection header field and its value is "Keep-Alive"
|
||||
|
@ -44,7 +44,7 @@ namespace Net
|
||||
/// - timeout: 60 seconds
|
||||
/// - keepAlive: true
|
||||
/// - maxKeepAliveRequests: 0
|
||||
/// - keepAliveTimeout: 10 seconds
|
||||
/// - keepAliveTimeout: 15 seconds
|
||||
|
||||
void setServerName(const std::string & serverName);
|
||||
/// Sets the name and port (name:port) that the server uses to identify itself.
|
||||
|
@ -56,6 +56,8 @@ namespace Net
|
||||
SocketAddress serverAddress();
|
||||
/// Returns the server's address.
|
||||
|
||||
void setKeepAliveTimeout(Poco::Timespan keepAliveTimeout);
|
||||
|
||||
private:
|
||||
bool _firstRequest;
|
||||
Poco::Timespan _keepAliveTimeout;
|
||||
|
@ -37,6 +37,7 @@ namespace Net {
|
||||
|
||||
|
||||
HTTPClientSession::ProxyConfig HTTPClientSession::_globalProxyConfig;
|
||||
const double HTTPClientSession::_defaultKeepAliveReliabilityLevel = 0.9;
|
||||
|
||||
|
||||
HTTPClientSession::HTTPClientSession():
|
||||
@ -220,7 +221,41 @@ void HTTPClientSession::setGlobalProxyConfig(const ProxyConfig& config)
|
||||
|
||||
void HTTPClientSession::setKeepAliveTimeout(const Poco::Timespan& timeout)
|
||||
{
|
||||
_keepAliveTimeout = timeout;
|
||||
if (connected())
|
||||
{
|
||||
throw Poco::IllegalStateException("cannot change keep alive timeout on initiated connection, "
|
||||
"That value is managed privately after connection is established.");
|
||||
}
|
||||
_keepAliveTimeout = timeout;
|
||||
}
|
||||
|
||||
|
||||
void HTTPClientSession::setKeepAliveMaxRequests(int max_requests)
|
||||
{
|
||||
if (connected())
|
||||
{
|
||||
throw Poco::IllegalStateException("cannot change keep alive max requests on initiated connection, "
|
||||
"That value is managed privately after connection is established.");
|
||||
}
|
||||
_keepAliveMaxRequests = max_requests;
|
||||
}
|
||||
|
||||
|
||||
void HTTPClientSession::setKeepAliveRequest(int request)
|
||||
{
|
||||
_keepAliveCurrentRequest = request;
|
||||
}
|
||||
|
||||
|
||||
|
||||
void HTTPClientSession::setLastRequest(Poco::Timestamp time)
|
||||
{
|
||||
if (connected())
|
||||
{
|
||||
throw Poco::IllegalStateException("cannot change last request on initiated connection, "
|
||||
"That value is managed privately after connection is established.");
|
||||
}
|
||||
_lastRequest = time;
|
||||
}
|
||||
|
||||
|
||||
@ -231,6 +266,8 @@ std::ostream& HTTPClientSession::sendRequest(HTTPRequest& request)
|
||||
clearException();
|
||||
_responseReceived = false;
|
||||
|
||||
_keepAliveCurrentRequest += 1;
|
||||
|
||||
bool keepAlive = getKeepAlive();
|
||||
if (((connected() && !keepAlive) || mustReconnect()) && !_host.empty())
|
||||
{
|
||||
@ -241,8 +278,10 @@ std::ostream& HTTPClientSession::sendRequest(HTTPRequest& request)
|
||||
{
|
||||
if (!connected())
|
||||
reconnect();
|
||||
if (!keepAlive)
|
||||
request.setKeepAlive(false);
|
||||
if (!request.has(HTTPMessage::CONNECTION))
|
||||
request.setKeepAlive(keepAlive);
|
||||
if (keepAlive && !request.has(HTTPMessage::CONNECTION_KEEP_ALIVE) && _keepAliveTimeout.totalSeconds() > 0)
|
||||
request.setKeepAliveTimeout(_keepAliveTimeout.totalSeconds(), _keepAliveMaxRequests);
|
||||
if (!request.has(HTTPRequest::HOST) && !_host.empty())
|
||||
request.setHost(_host, _port);
|
||||
if (!_proxyConfig.host.empty() && !bypassProxy())
|
||||
@ -324,6 +363,17 @@ std::istream& HTTPClientSession::receiveResponse(HTTPResponse& response)
|
||||
|
||||
_mustReconnect = getKeepAlive() && !response.getKeepAlive();
|
||||
|
||||
if (!_mustReconnect)
|
||||
{
|
||||
/// when server sends its keep alive timeout, client has to follow that value
|
||||
auto timeout = response.getKeepAliveTimeout();
|
||||
if (timeout > 0)
|
||||
_keepAliveTimeout = std::min(_keepAliveTimeout, Poco::Timespan(timeout, 0));
|
||||
auto max_requests = response.getKeepAliveMaxRequests();
|
||||
if (max_requests > 0)
|
||||
_keepAliveMaxRequests = std::min(_keepAliveMaxRequests, max_requests);
|
||||
}
|
||||
|
||||
if (!_expectResponseBody || response.getStatus() < 200 || response.getStatus() == HTTPResponse::HTTP_NO_CONTENT || response.getStatus() == HTTPResponse::HTTP_NOT_MODIFIED)
|
||||
_pResponseStream = new HTTPFixedLengthInputStream(*this, 0);
|
||||
else if (response.getChunkedTransferEncoding())
|
||||
@ -430,15 +480,18 @@ std::string HTTPClientSession::proxyRequestPrefix() const
|
||||
return result;
|
||||
}
|
||||
|
||||
bool HTTPClientSession::isKeepAliveExpired(double reliability) const
|
||||
{
|
||||
Poco::Timestamp now;
|
||||
return Timespan(Timestamp::TimeDiff(reliability *_keepAliveTimeout.totalMicroseconds())) <= now - _lastRequest
|
||||
|| _keepAliveCurrentRequest > _keepAliveMaxRequests;
|
||||
}
|
||||
|
||||
bool HTTPClientSession::mustReconnect() const
|
||||
{
|
||||
if (!_mustReconnect)
|
||||
{
|
||||
Poco::Timestamp now;
|
||||
return _keepAliveTimeout <= now - _lastRequest;
|
||||
}
|
||||
else return true;
|
||||
return isKeepAliveExpired(_defaultKeepAliveReliabilityLevel);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
@ -511,14 +564,21 @@ void HTTPClientSession::assign(Poco::Net::HTTPClientSession & session)
|
||||
if (buffered())
|
||||
throw Poco::LogicException("assign to a session with not empty buffered data");
|
||||
|
||||
attachSocket(session.detachSocket());
|
||||
setLastRequest(session.getLastRequest());
|
||||
poco_assert(!connected());
|
||||
|
||||
setResolvedHost(session.getResolvedHost());
|
||||
setKeepAlive(session.getKeepAlive());
|
||||
setProxyConfig(session.getProxyConfig());
|
||||
|
||||
setTimeout(session.getConnectionTimeout(), session.getSendTimeout(), session.getReceiveTimeout());
|
||||
setKeepAlive(session.getKeepAlive());
|
||||
|
||||
setLastRequest(session.getLastRequest());
|
||||
setKeepAliveTimeout(session.getKeepAliveTimeout());
|
||||
setProxyConfig(session.getProxyConfig());
|
||||
|
||||
_keepAliveMaxRequests = session._keepAliveMaxRequests;
|
||||
_keepAliveCurrentRequest = session._keepAliveCurrentRequest;
|
||||
|
||||
attachSocket(session.detachSocket());
|
||||
|
||||
session.reset();
|
||||
}
|
||||
|
@ -17,6 +17,7 @@
|
||||
#include "Poco/NumberFormatter.h"
|
||||
#include "Poco/NumberParser.h"
|
||||
#include "Poco/String.h"
|
||||
#include <format>
|
||||
|
||||
|
||||
using Poco::NumberFormatter;
|
||||
@ -179,4 +180,51 @@ bool HTTPMessage::getKeepAlive() const
|
||||
}
|
||||
|
||||
|
||||
void HTTPMessage::setKeepAliveTimeout(int timeout, int max_requests)
|
||||
{
|
||||
add(HTTPMessage::CONNECTION_KEEP_ALIVE, std::format("timeout={}, max={}", timeout, max_requests));
|
||||
}
|
||||
|
||||
|
||||
int parseFromHeaderValues(const std::string_view header_value, const std::string_view param_name)
|
||||
{
|
||||
auto param_value_pos = header_value.find(param_name);
|
||||
if (param_value_pos == std::string::npos)
|
||||
param_value_pos = header_value.size();
|
||||
if (param_value_pos != header_value.size())
|
||||
param_value_pos += param_name.size();
|
||||
|
||||
auto param_value_end = header_value.find(',', param_value_pos);
|
||||
if (param_value_end == std::string::npos)
|
||||
param_value_end = header_value.size();
|
||||
|
||||
auto timeout_value_substr = header_value.substr(param_value_pos, param_value_end - param_value_pos);
|
||||
if (timeout_value_substr.empty())
|
||||
return -1;
|
||||
|
||||
int value = 0;
|
||||
auto [ptr, ec] = std::from_chars(timeout_value_substr.begin(), timeout_value_substr.end(), value);
|
||||
|
||||
if (ec == std::errc())
|
||||
return value;
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
int HTTPMessage::getKeepAliveTimeout() const
|
||||
{
|
||||
const std::string& ka_header = get(HTTPMessage::CONNECTION_KEEP_ALIVE, HTTPMessage::EMPTY);
|
||||
static const std::string_view timeout_param = "timeout=";
|
||||
return parseFromHeaderValues(ka_header, timeout_param);
|
||||
}
|
||||
|
||||
|
||||
int HTTPMessage::getKeepAliveMaxRequests() const
|
||||
{
|
||||
const std::string& ka_header = get(HTTPMessage::CONNECTION_KEEP_ALIVE, HTTPMessage::EMPTY);
|
||||
static const std::string_view timeout_param = "max=";
|
||||
return parseFromHeaderValues(ka_header, timeout_param);
|
||||
}
|
||||
|
||||
} } // namespace Poco::Net
|
||||
|
@ -88,7 +88,18 @@ void HTTPServerConnection::run()
|
||||
|
||||
pHandler->handleRequest(request, response);
|
||||
session.setKeepAlive(_pParams->getKeepAlive() && response.getKeepAlive() && session.canKeepAlive());
|
||||
}
|
||||
|
||||
/// all that fuzz is all about to make session close with less timeout than 15s (set in HTTPServerParams c-tor)
|
||||
if (_pParams->getKeepAlive() && response.getKeepAlive() && session.canKeepAlive())
|
||||
{
|
||||
int value = response.getKeepAliveTimeout();
|
||||
if (value < 0)
|
||||
value = request.getKeepAliveTimeout();
|
||||
if (value > 0)
|
||||
session.setKeepAliveTimeout(Poco::Timespan(value, 0));
|
||||
}
|
||||
|
||||
}
|
||||
else sendErrorResponse(session, HTTPResponse::HTTP_NOT_IMPLEMENTED);
|
||||
}
|
||||
catch (Poco::Exception&)
|
||||
|
@ -33,6 +33,12 @@ HTTPServerSession::~HTTPServerSession()
|
||||
{
|
||||
}
|
||||
|
||||
void HTTPServerSession::setKeepAliveTimeout(Poco::Timespan keepAliveTimeout)
|
||||
{
|
||||
_keepAliveTimeout = keepAliveTimeout;
|
||||
}
|
||||
|
||||
|
||||
|
||||
bool HTTPServerSession::hasMoreRequests()
|
||||
{
|
||||
|
@ -16,6 +16,7 @@
|
||||
#include <Poco/Net/HTTPRequest.h>
|
||||
#include <Poco/Net/HTTPResponse.h>
|
||||
#include <Poco/Net/HTTPStream.h>
|
||||
#include <Poco/Net/NetException.h>
|
||||
#include <Poco/Timespan.h>
|
||||
|
||||
#include <queue>
|
||||
@ -83,17 +84,15 @@ namespace
|
||||
}
|
||||
|
||||
|
||||
size_t roundUp(size_t x, size_t rounding)
|
||||
constexpr size_t roundUp(size_t x, size_t rounding)
|
||||
{
|
||||
chassert(rounding > 0);
|
||||
return (x + (rounding - 1)) / rounding * rounding;
|
||||
}
|
||||
|
||||
|
||||
Poco::Timespan divide(const Poco::Timespan span, int divisor)
|
||||
{
|
||||
return Poco::Timespan(Poco::Timestamp::TimeDiff(span.totalMicroseconds() / divisor));
|
||||
return (x + rounding) / rounding * rounding;
|
||||
}
|
||||
static_assert(roundUp(10000, 100) == 10100);
|
||||
static_assert(roundUp(10001, 100) == 10100);
|
||||
static_assert(roundUp(10099, 100) == 10100);
|
||||
static_assert(roundUp(10100, 100) == 10200);
|
||||
}
|
||||
|
||||
namespace DB
|
||||
@ -202,8 +201,9 @@ public:
|
||||
|
||||
if (total_connections_in_group >= limits.warning_limit && total_connections_in_group >= mute_warning_until)
|
||||
{
|
||||
LOG_WARNING(log, "Too many active sessions in group {}, count {}, warning limit {}", type, total_connections_in_group, limits.warning_limit);
|
||||
mute_warning_until = roundUp(total_connections_in_group, HTTPConnectionPools::Limits::warning_step);
|
||||
LOG_WARNING(log, "Too many active sessions in group {}, count {}, warning limit {}, next warning at {}",
|
||||
type, total_connections_in_group, limits.warning_limit, mute_warning_until);
|
||||
}
|
||||
}
|
||||
|
||||
@ -213,7 +213,8 @@ public:
|
||||
|
||||
--total_connections_in_group;
|
||||
|
||||
const size_t reduced_warning_limit = limits.warning_limit > 10 ? limits.warning_limit - 10 : 1;
|
||||
const size_t gap = 20;
|
||||
const size_t reduced_warning_limit = limits.warning_limit > gap ? limits.warning_limit - gap : 1;
|
||||
if (mute_warning_until > 0 && total_connections_in_group < reduced_warning_limit)
|
||||
{
|
||||
LOG_WARNING(log, "Sessions count is OK in the group {}, count {}", type, total_connections_in_group);
|
||||
@ -273,9 +274,15 @@ private:
|
||||
public:
|
||||
using Ptr = std::shared_ptr<PooledConnection>;
|
||||
|
||||
using Session::mustReconnect;
|
||||
|
||||
void markAsExpired()
|
||||
{
|
||||
isExpired = true;
|
||||
}
|
||||
|
||||
void reconnect() override
|
||||
{
|
||||
ProfileEvents::increment(metrics.reset);
|
||||
Session::close();
|
||||
|
||||
if (auto lock = pool.lock())
|
||||
@ -283,6 +290,7 @@ private:
|
||||
auto timeouts = getTimeouts(*this);
|
||||
auto new_connection = lock->getConnection(timeouts);
|
||||
Session::assign(*new_connection);
|
||||
Session::setKeepAliveRequest(Session::getKeepAliveRequest() + 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -304,6 +312,12 @@ private:
|
||||
Session::getPort());
|
||||
}
|
||||
|
||||
Poco::Timespan idleTime()
|
||||
{
|
||||
Poco::Timestamp now;
|
||||
return now - Session::getLastRequest();
|
||||
}
|
||||
|
||||
void flushRequest() override
|
||||
{
|
||||
if (bool(request_stream))
|
||||
@ -335,6 +349,7 @@ private:
|
||||
|
||||
std::ostream & sendRequest(Poco::Net::HTTPRequest & request) override
|
||||
{
|
||||
auto idle = idleTime();
|
||||
std::ostream & result = Session::sendRequest(request);
|
||||
result.exceptions(std::ios::badbit);
|
||||
|
||||
@ -392,10 +407,11 @@ private:
|
||||
}
|
||||
response_stream = nullptr;
|
||||
|
||||
if (auto lock = pool.lock())
|
||||
lock->atConnectionDestroy(*this);
|
||||
else
|
||||
ProfileEvents::increment(metrics.reset);
|
||||
group->atConnectionDestroy();
|
||||
|
||||
if (!isExpired)
|
||||
if (auto lock = pool.lock())
|
||||
lock->atConnectionDestroy(*this);
|
||||
|
||||
CurrentMetrics::sub(metrics.active_count);
|
||||
}
|
||||
@ -404,10 +420,18 @@ private:
|
||||
friend class EndpointConnectionPool;
|
||||
|
||||
template <class... Args>
|
||||
explicit PooledConnection(EndpointConnectionPool::WeakPtr pool_, IHTTPConnectionPoolForEndpoint::Metrics metrics_, Args &&... args)
|
||||
: Session(args...), pool(std::move(pool_)), metrics(std::move(metrics_))
|
||||
explicit PooledConnection(
|
||||
EndpointConnectionPool::WeakPtr pool_,
|
||||
ConnectionGroup::Ptr group_,
|
||||
IHTTPConnectionPoolForEndpoint::Metrics metrics_,
|
||||
Args &&... args)
|
||||
: Session(std::forward<Args>(args)...)
|
||||
, pool(std::move(pool_))
|
||||
, group(group_)
|
||||
, metrics(std::move(metrics_))
|
||||
{
|
||||
CurrentMetrics::add(metrics.active_count);
|
||||
group->atConnectionCreate();
|
||||
}
|
||||
|
||||
template <class... Args>
|
||||
@ -433,10 +457,12 @@ private:
|
||||
return request_stream_completed && response_stream_completed;
|
||||
}
|
||||
|
||||
WeakPtr pool;
|
||||
EndpointConnectionPool::WeakPtr pool;
|
||||
ConnectionGroup::Ptr group;
|
||||
IHTTPConnectionPoolForEndpoint::Metrics metrics;
|
||||
bool isExpired = false;
|
||||
|
||||
Poco::Logger * log = &Poco::Logger::get("PooledConnection");
|
||||
LoggerPtr log = getLogger("PooledConnection");
|
||||
|
||||
std::ostream * request_stream = nullptr;
|
||||
std::istream * response_stream = nullptr;
|
||||
@ -484,7 +510,6 @@ public:
|
||||
|
||||
IHTTPConnectionPoolForEndpoint::ConnectionPtr getConnection(const ConnectionTimeouts & timeouts) override
|
||||
{
|
||||
Poco::Timestamp now;
|
||||
std::vector<ConnectionPtr> expired_connections;
|
||||
|
||||
SCOPE_EXIT({
|
||||
@ -494,8 +519,9 @@ public:
|
||||
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
expired_connections.reserve(stored_connections.size());
|
||||
|
||||
wipeExpiredImpl(expired_connections, now);
|
||||
wipeExpiredImpl(expired_connections);
|
||||
|
||||
if (!stored_connections.empty())
|
||||
{
|
||||
@ -526,7 +552,6 @@ public:
|
||||
|
||||
size_t wipeExpired() override
|
||||
{
|
||||
Poco::Timestamp now;
|
||||
std::vector<ConnectionPtr> expired_connections;
|
||||
|
||||
SCOPE_EXIT({
|
||||
@ -535,25 +560,29 @@ public:
|
||||
});
|
||||
|
||||
std::lock_guard lock(mutex);
|
||||
return wipeExpiredImpl(expired_connections, now);
|
||||
return wipeExpiredImpl(expired_connections);
|
||||
}
|
||||
|
||||
size_t wipeExpiredImpl(std::vector<ConnectionPtr> & expired_connections, Poco::Timestamp now) TSA_REQUIRES(mutex)
|
||||
size_t wipeExpiredImpl(std::vector<ConnectionPtr> & expired_connections) TSA_REQUIRES(mutex)
|
||||
{
|
||||
SCOPE_EXIT({
|
||||
CurrentMetrics::sub(getMetrics().stored_count, expired_connections.size());
|
||||
ProfileEvents::increment(getMetrics().expired, expired_connections.size());
|
||||
});
|
||||
|
||||
auto isSoftLimitReached = group->isSoftLimitReached();
|
||||
while (!stored_connections.empty())
|
||||
{
|
||||
auto connection = stored_connections.top();
|
||||
|
||||
if (!isExpired(now, connection))
|
||||
if (!isExpired(connection, isSoftLimitReached))
|
||||
return stored_connections.size();
|
||||
|
||||
stored_connections.pop();
|
||||
connection->markAsExpired();
|
||||
expired_connections.push_back(connection);
|
||||
}
|
||||
|
||||
CurrentMetrics::sub(getMetrics().stored_count, expired_connections.size());
|
||||
ProfileEvents::increment(getMetrics().expired, expired_connections.size());
|
||||
|
||||
return stored_connections.size();
|
||||
}
|
||||
|
||||
@ -569,57 +598,53 @@ private:
|
||||
|
||||
WeakPtr getWeakFromThis() { return EndpointConnectionPool::weak_from_this(); }
|
||||
|
||||
bool isExpired(Poco::Timestamp & now, ConnectionPtr connection)
|
||||
bool isExpired(ConnectionPtr connection, bool isSoftLimitReached) TSA_REQUIRES(mutex)
|
||||
{
|
||||
if (group->isSoftLimitReached())
|
||||
return now > (connection->getLastRequest() + divide(connection->getKeepAliveTimeout(), 10));
|
||||
return now > connection->getLastRequest() + connection->getKeepAliveTimeout();
|
||||
if (isSoftLimitReached)
|
||||
return connection->isKeepAliveExpired(0.1);
|
||||
return connection->isKeepAliveExpired(0.8);
|
||||
}
|
||||
|
||||
ConnectionPtr allocateNewConnection()
|
||||
|
||||
ConnectionPtr prepareNewConnection(const ConnectionTimeouts & timeouts)
|
||||
{
|
||||
ConnectionPtr connection = PooledConnection::create(this->getWeakFromThis(), getMetrics(), host, port);
|
||||
auto connection = PooledConnection::create(this->getWeakFromThis(), group, getMetrics(), host, port);
|
||||
|
||||
connection->setKeepAlive(true);
|
||||
setTimeouts(*connection, timeouts);
|
||||
|
||||
if (!proxy_configuration.isEmpty())
|
||||
{
|
||||
connection->setProxyConfig(proxyConfigurationToPocoProxyConfig(proxy_configuration));
|
||||
}
|
||||
|
||||
group->atConnectionCreate();
|
||||
|
||||
return connection;
|
||||
}
|
||||
|
||||
ConnectionPtr prepareNewConnection(const ConnectionTimeouts & timeouts)
|
||||
{
|
||||
auto address = HostResolversPool::instance().getResolver(host)->resolve();
|
||||
|
||||
auto session = allocateNewConnection();
|
||||
|
||||
setTimeouts(*session, timeouts);
|
||||
session->setResolvedHost(*address);
|
||||
connection->setResolvedHost(*address);
|
||||
|
||||
try
|
||||
{
|
||||
auto timer = CurrentThread::getProfileEvents().timer(getMetrics().elapsed_microseconds);
|
||||
session->doConnect();
|
||||
connection->doConnect();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
address.setFail();
|
||||
ProfileEvents::increment(getMetrics().errors);
|
||||
session->reset();
|
||||
connection->reset();
|
||||
throw;
|
||||
}
|
||||
|
||||
ProfileEvents::increment(getMetrics().created);
|
||||
return session;
|
||||
return connection;
|
||||
}
|
||||
|
||||
void atConnectionDestroy(PooledConnection & connection)
|
||||
{
|
||||
group->atConnectionDestroy();
|
||||
if (connection.getKeepAliveRequest() >= connection.getKeepAliveMaxRequests())
|
||||
{
|
||||
ProfileEvents::increment(getMetrics().expired, 1);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!connection.connected() || connection.mustReconnect() || !connection.isCompleted() || connection.buffered()
|
||||
|| group->isStoreLimitReached())
|
||||
@ -628,17 +653,17 @@ private:
|
||||
return;
|
||||
}
|
||||
|
||||
auto connection_to_store = allocateNewConnection();
|
||||
auto connection_to_store = PooledConnection::create(this->getWeakFromThis(), group, getMetrics(), host, port);
|
||||
connection_to_store->assign(connection);
|
||||
|
||||
CurrentMetrics::add(getMetrics().stored_count, 1);
|
||||
ProfileEvents::increment(getMetrics().preserved, 1);
|
||||
|
||||
{
|
||||
MemoryTrackerSwitcher switcher{&total_memory_tracker};
|
||||
std::lock_guard lock(mutex);
|
||||
stored_connections.push(connection_to_store);
|
||||
}
|
||||
|
||||
CurrentMetrics::add(getMetrics().stored_count, 1);
|
||||
ProfileEvents::increment(getMetrics().preserved, 1);
|
||||
}
|
||||
|
||||
|
||||
@ -726,14 +751,13 @@ createConnectionPool(ConnectionGroup::Ptr group, std::string host, UInt16 port,
|
||||
class HTTPConnectionPools::Impl
|
||||
{
|
||||
private:
|
||||
const size_t DEFAULT_WIPE_TIMEOUT_SECONDS = 5 * 60;
|
||||
const size_t DEFAULT_WIPE_TIMEOUT_SECONDS = 10 * 60;
|
||||
const Poco::Timespan wipe_timeout = Poco::Timespan(DEFAULT_WIPE_TIMEOUT_SECONDS, 0);
|
||||
|
||||
ConnectionGroup::Ptr disk_group = std::make_shared<ConnectionGroup>(HTTPConnectionGroupType::DISK);
|
||||
ConnectionGroup::Ptr storage_group = std::make_shared<ConnectionGroup>(HTTPConnectionGroupType::STORAGE);
|
||||
ConnectionGroup::Ptr http_group = std::make_shared<ConnectionGroup>(HTTPConnectionGroupType::HTTP);
|
||||
|
||||
|
||||
/// If multiple mutexes are held simultaneously,
|
||||
/// they should be locked in this order:
|
||||
/// HTTPConnectionPools::mutex, then EndpointConnectionPool::mutex, then ConnectionGroup::mutex.
|
||||
|
@ -2,7 +2,6 @@
|
||||
#include <Common/HTTPConnectionPool.h>
|
||||
|
||||
#include <Poco/URI.h>
|
||||
#include <Poco/Net/ServerSocket.h>
|
||||
#include <Poco/Net/MessageHeader.h>
|
||||
#include <Poco/Net/HTTPServerRequest.h>
|
||||
#include <Poco/Net/HTTPServerResponse.h>
|
||||
@ -17,6 +16,40 @@
|
||||
namespace
|
||||
{
|
||||
|
||||
template<class T>
|
||||
class SafeHandler
|
||||
{
|
||||
public:
|
||||
using Ptr = std::shared_ptr<SafeHandler<T>>;
|
||||
|
||||
SafeHandler() = default;
|
||||
SafeHandler(SafeHandler<T>&) = delete;
|
||||
SafeHandler& operator=(SafeHandler<T>&) = delete;
|
||||
|
||||
T get()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return obj;
|
||||
}
|
||||
|
||||
void set(T && options_)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
obj = std::move(options_);
|
||||
}
|
||||
|
||||
protected:
|
||||
std::mutex mutex;
|
||||
T obj = {};
|
||||
};
|
||||
|
||||
struct RequestOptions
|
||||
{
|
||||
size_t slowdown_receive = 0;
|
||||
int overwrite_keep_alive_timeout = 0;
|
||||
int overwrite_keep_alive_max_requests = 10;
|
||||
};
|
||||
|
||||
size_t stream_copy_n(std::istream & in, std::ostream & out, std::size_t count = std::numeric_limits<size_t>::max())
|
||||
{
|
||||
const size_t buffer_size = 4096;
|
||||
@ -47,13 +80,21 @@ size_t stream_copy_n(std::istream & in, std::ostream & out, std::size_t count =
|
||||
class MockRequestHandler : public Poco::Net::HTTPRequestHandler
|
||||
{
|
||||
public:
|
||||
explicit MockRequestHandler(std::shared_ptr<std::atomic<size_t>> slowdown_)
|
||||
: slowdown(std::move(slowdown_))
|
||||
explicit MockRequestHandler(SafeHandler<RequestOptions>::Ptr options_)
|
||||
: options(options_)
|
||||
{
|
||||
}
|
||||
|
||||
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override
|
||||
{
|
||||
int value = request.getKeepAliveTimeout();
|
||||
ASSERT_GT(value, 0);
|
||||
|
||||
auto params = options->get();
|
||||
|
||||
if (params.overwrite_keep_alive_timeout > 0)
|
||||
response.setKeepAliveTimeout(params.overwrite_keep_alive_timeout, params.overwrite_keep_alive_max_requests);
|
||||
|
||||
response.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
|
||||
auto size = request.getContentLength();
|
||||
if (size > 0)
|
||||
@ -61,28 +102,29 @@ public:
|
||||
else
|
||||
response.setChunkedTransferEncoding(true); // or chunk encoding
|
||||
|
||||
sleepForSeconds(*slowdown);
|
||||
if (params.slowdown_receive > 0)
|
||||
sleepForSeconds(params.slowdown_receive);
|
||||
|
||||
stream_copy_n(request.stream(), response.send(), size);
|
||||
}
|
||||
|
||||
std::shared_ptr<std::atomic<size_t>> slowdown;
|
||||
SafeHandler<RequestOptions>::Ptr options;
|
||||
};
|
||||
|
||||
class HTTPRequestHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
|
||||
{
|
||||
public:
|
||||
explicit HTTPRequestHandlerFactory(std::shared_ptr<std::atomic<size_t>> slowdown_)
|
||||
: slowdown(std::move(slowdown_))
|
||||
explicit HTTPRequestHandlerFactory(SafeHandler<RequestOptions>::Ptr options_)
|
||||
: options(options_)
|
||||
{
|
||||
}
|
||||
|
||||
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest &) override
|
||||
{
|
||||
return new MockRequestHandler(slowdown);
|
||||
return new MockRequestHandler(options);
|
||||
}
|
||||
|
||||
std::shared_ptr<std::atomic<size_t>> slowdown;
|
||||
SafeHandler<RequestOptions>::Ptr options;
|
||||
};
|
||||
|
||||
}
|
||||
@ -94,6 +136,8 @@ class ConnectionPoolTest : public testing::Test {
|
||||
protected:
|
||||
ConnectionPoolTest()
|
||||
{
|
||||
options = std::make_shared<SafeHandler<RequestOptions>>();
|
||||
|
||||
startServer();
|
||||
}
|
||||
|
||||
@ -102,7 +146,7 @@ protected:
|
||||
DB::HTTPConnectionPools::Limits def_limits{};
|
||||
DB::HTTPConnectionPools::instance().setLimits(def_limits, def_limits, def_limits);
|
||||
|
||||
setSlowDown(0);
|
||||
options->set(RequestOptions());
|
||||
|
||||
DB::HTTPConnectionPools::instance().dropCache();
|
||||
DB::CurrentThread::getProfileEvents().reset();
|
||||
@ -129,7 +173,7 @@ protected:
|
||||
void startServer()
|
||||
{
|
||||
server_data.reset();
|
||||
server_data.handler_factory = new HTTPRequestHandlerFactory(slowdown_receive);
|
||||
server_data.handler_factory = new HTTPRequestHandlerFactory(options);
|
||||
server_data.server = std::make_unique<Poco::Net::HTTPServer>(
|
||||
server_data.handler_factory, server_data.port);
|
||||
|
||||
@ -143,11 +187,21 @@ protected:
|
||||
|
||||
void setSlowDown(size_t seconds)
|
||||
{
|
||||
*slowdown_receive = seconds;
|
||||
auto opt = options->get();
|
||||
opt.slowdown_receive = seconds;
|
||||
options->set(std::move(opt));
|
||||
}
|
||||
|
||||
void setOverWriteKeepAlive(size_t seconds, int max_requests)
|
||||
{
|
||||
auto opt = options->get();
|
||||
opt.overwrite_keep_alive_timeout = int(seconds);
|
||||
opt.overwrite_keep_alive_max_requests= max_requests;
|
||||
options->set(std::move(opt));
|
||||
}
|
||||
|
||||
DB::ConnectionTimeouts timeouts;
|
||||
std::shared_ptr<std::atomic<size_t>> slowdown_receive = std::make_shared<std::atomic<size_t>>(0);
|
||||
SafeHandler<RequestOptions>::Ptr options;
|
||||
|
||||
struct ServerData
|
||||
{
|
||||
@ -182,7 +236,7 @@ protected:
|
||||
void wait_until(std::function<bool()> pred)
|
||||
{
|
||||
while (!pred())
|
||||
sleepForMilliseconds(250);
|
||||
sleepForMilliseconds(10);
|
||||
}
|
||||
|
||||
void echoRequest(String data, HTTPSession & session)
|
||||
@ -245,45 +299,52 @@ TEST_F(ConnectionPoolTest, CanRequest)
|
||||
ASSERT_EQ(0, getServer().currentConnections());
|
||||
ASSERT_EQ(1, getServer().totalConnections());
|
||||
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, CanPreserve)
|
||||
{
|
||||
auto pool = getPool();
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
}
|
||||
|
||||
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().stored_count));
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
|
||||
|
||||
wait_until([&] () { return getServer().currentConnections() == 1; });
|
||||
ASSERT_EQ(1, getServer().currentConnections());
|
||||
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, CanReuse)
|
||||
{
|
||||
auto pool = getPool();
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
// DB::setReuseTag(*connection);
|
||||
}
|
||||
|
||||
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().stored_count));
|
||||
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
|
||||
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().stored_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
|
||||
|
||||
wait_until([&] () { return getServer().currentConnections() == 1; });
|
||||
ASSERT_EQ(1, getServer().currentConnections());
|
||||
@ -293,6 +354,11 @@ TEST_F(ConnectionPoolTest, CanReuse)
|
||||
ASSERT_EQ(1, getServer().totalConnections());
|
||||
ASSERT_EQ(1, getServer().currentConnections());
|
||||
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
|
||||
connection->reset();
|
||||
}
|
||||
|
||||
@ -303,15 +369,16 @@ TEST_F(ConnectionPoolTest, CanReuse)
|
||||
ASSERT_EQ(0, getServer().currentConnections());
|
||||
ASSERT_EQ(1, getServer().totalConnections());
|
||||
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, CanReuse10)
|
||||
{
|
||||
auto pool = getPool();
|
||||
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
for (int i = 0; i < 10; ++i)
|
||||
{
|
||||
@ -328,16 +395,23 @@ TEST_F(ConnectionPoolTest, CanReuse10)
|
||||
ASSERT_EQ(0, getServer().currentConnections());
|
||||
ASSERT_EQ(1, getServer().totalConnections());
|
||||
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
|
||||
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
|
||||
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
|
||||
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, CanReuse5)
|
||||
{
|
||||
timeouts.withHTTPKeepAliveTimeout(1);
|
||||
auto ka = Poco::Timespan(1, 0); // 1 seconds
|
||||
timeouts.withHTTPKeepAliveTimeout(ka);
|
||||
|
||||
auto pool = getPool();
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
std::vector<DB::HTTPSessionPtr> connections;
|
||||
connections.reserve(5);
|
||||
@ -347,11 +421,14 @@ TEST_F(ConnectionPoolTest, CanReuse5)
|
||||
}
|
||||
connections.clear();
|
||||
|
||||
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
|
||||
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
|
||||
ASSERT_EQ(5, CurrentMetrics::get(pool->getMetrics().active_count));
|
||||
ASSERT_EQ(5, CurrentMetrics::get(pool->getMetrics().stored_count));
|
||||
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(5, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(5, CurrentMetrics::get(metrics.stored_count));
|
||||
|
||||
wait_until([&] () { return getServer().currentConnections() == 5; });
|
||||
ASSERT_EQ(5, getServer().currentConnections());
|
||||
@ -363,35 +440,56 @@ TEST_F(ConnectionPoolTest, CanReuse5)
|
||||
echoRequest("Hello", *connection);
|
||||
}
|
||||
|
||||
ASSERT_EQ(5, getServer().totalConnections());
|
||||
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
|
||||
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
|
||||
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
|
||||
ASSERT_EQ(5, CurrentMetrics::get(pool->getMetrics().active_count));
|
||||
ASSERT_EQ(5, CurrentMetrics::get(pool->getMetrics().stored_count));
|
||||
ASSERT_EQ(5, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(5, CurrentMetrics::get(metrics.stored_count));
|
||||
|
||||
/// wait until all connections are timeouted
|
||||
wait_until([&] () { return getServer().currentConnections() == 0; });
|
||||
|
||||
{
|
||||
// just to trigger pool->wipeExpired();
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
connection->reset();
|
||||
}
|
||||
|
||||
ASSERT_EQ(6, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, CanReconnectAndCreate)
|
||||
{
|
||||
auto pool = getPool();
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
std::vector<HTTPSessionPtr> in_use;
|
||||
|
||||
const size_t count = 2;
|
||||
const size_t count = 3;
|
||||
for (int i = 0; i < count; ++i)
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
// DB::setReuseTag(*connection);
|
||||
in_use.push_back(connection);
|
||||
}
|
||||
|
||||
ASSERT_EQ(count, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
|
||||
ASSERT_EQ(count, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(count, CurrentMetrics::get(pool->getMetrics().active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().stored_count));
|
||||
ASSERT_EQ(count, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
|
||||
|
||||
auto connection = std::move(in_use.back());
|
||||
in_use.pop_back();
|
||||
@ -402,28 +500,39 @@ TEST_F(ConnectionPoolTest, CanReconnectAndCreate)
|
||||
|
||||
echoRequest("Hello", *connection);
|
||||
|
||||
connection->reset();
|
||||
ASSERT_EQ(count+1, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
wait_until([&] () { return getServer().currentConnections() == 1; });
|
||||
ASSERT_EQ(1, getServer().currentConnections());
|
||||
ASSERT_EQ(count+1, getServer().totalConnections());
|
||||
|
||||
ASSERT_EQ(count+1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
|
||||
ASSERT_EQ(count, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, CanReconnectAndReuse)
|
||||
{
|
||||
auto ka = Poco::Timespan(1, 0); // 1 seconds
|
||||
timeouts.withHTTPKeepAliveTimeout(ka);
|
||||
|
||||
auto pool = getPool();
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
std::vector<HTTPSessionPtr> in_use;
|
||||
|
||||
const size_t count = 2;
|
||||
const size_t count = 3;
|
||||
for (int i = 0; i < count; ++i)
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
/// make some request in order to show to the server the keep alive headers
|
||||
echoRequest("Hello", *connection);
|
||||
in_use.push_back(std::move(connection));
|
||||
}
|
||||
in_use.clear();
|
||||
|
||||
for (int i = 0; i < count; ++i)
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
// DB::setReuseTag(*connection);
|
||||
in_use.push_back(std::move(connection));
|
||||
}
|
||||
|
||||
@ -441,11 +550,16 @@ TEST_F(ConnectionPoolTest, CanReconnectAndReuse)
|
||||
|
||||
wait_until([&] () { return getServer().currentConnections() == 0; });
|
||||
ASSERT_EQ(0, getServer().currentConnections());
|
||||
ASSERT_EQ(2, getServer().totalConnections());
|
||||
ASSERT_EQ(count, getServer().totalConnections());
|
||||
|
||||
ASSERT_EQ(count, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
|
||||
ASSERT_EQ(count, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(count + count - 1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(count + 1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(count-1, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(count-2, CurrentMetrics::get(metrics.stored_count));
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, ReceiveTimeout)
|
||||
@ -454,6 +568,7 @@ TEST_F(ConnectionPoolTest, ReceiveTimeout)
|
||||
timeouts.withReceiveTimeout(1);
|
||||
|
||||
auto pool = getPool();
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
@ -462,10 +577,14 @@ TEST_F(ConnectionPoolTest, ReceiveTimeout)
|
||||
);
|
||||
}
|
||||
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
|
||||
|
||||
{
|
||||
timeouts.withReceiveTimeout(3);
|
||||
@ -475,10 +594,14 @@ TEST_F(ConnectionPoolTest, ReceiveTimeout)
|
||||
);
|
||||
}
|
||||
|
||||
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
|
||||
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
|
||||
|
||||
{
|
||||
/// timeouts have effect for reused session
|
||||
@ -489,10 +612,14 @@ TEST_F(ConnectionPoolTest, ReceiveTimeout)
|
||||
);
|
||||
}
|
||||
|
||||
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
|
||||
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
|
||||
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, ReadWriteBufferFromHTTP)
|
||||
@ -500,6 +627,7 @@ TEST_F(ConnectionPoolTest, ReadWriteBufferFromHTTP)
|
||||
std::string_view message = "Hello ReadWriteBufferFromHTTP";
|
||||
auto uri = Poco::URI(getServerUrl());
|
||||
auto metrics = DB::HTTPConnectionPools::instance().getPool(DB::HTTPConnectionGroupType::HTTP, uri, DB::ProxyConfiguration{})->getMetrics();
|
||||
|
||||
Poco::Net::HTTPBasicCredentials empty_creds;
|
||||
auto buf_from_http = DB::BuilderRWBufferFromHTTP(uri)
|
||||
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
|
||||
@ -527,6 +655,7 @@ TEST_F(ConnectionPoolTest, ReadWriteBufferFromHTTP)
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
|
||||
@ -538,23 +667,26 @@ TEST_F(ConnectionPoolTest, HardLimit)
|
||||
DB::HTTPConnectionPools::instance().setLimits(zero_limits, zero_limits, zero_limits);
|
||||
|
||||
auto pool = getPool();
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
}
|
||||
|
||||
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().stored_count));
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, NoReceiveCall)
|
||||
{
|
||||
auto pool = getPool();
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
@ -570,11 +702,209 @@ TEST_F(ConnectionPoolTest, NoReceiveCall)
|
||||
connection->flushRequest();
|
||||
}
|
||||
|
||||
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().stored_count));
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, ReconnectedWhenConnectionIsHoldTooLong)
|
||||
{
|
||||
auto ka = Poco::Timespan(1, 0); // 1 seconds
|
||||
timeouts.withHTTPKeepAliveTimeout(ka);
|
||||
|
||||
auto pool = getPool();
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
|
||||
echoRequest("Hello", *connection);
|
||||
|
||||
auto fake_ka = Poco::Timespan(30 * 1000 * 1000); // 30 seconds
|
||||
timeouts.withHTTPKeepAliveTimeout(fake_ka);
|
||||
DB::setTimeouts(*connection, timeouts); // new keep alive timeout has no effect
|
||||
|
||||
wait_until([&] () { return getServer().currentConnections() == 0; });
|
||||
|
||||
ASSERT_EQ(1, connection->connected());
|
||||
ASSERT_EQ(1, connection->getKeepAlive());
|
||||
ASSERT_EQ(1000, connection->getKeepAliveTimeout().totalMilliseconds());
|
||||
ASSERT_EQ(1, connection->isKeepAliveExpired(connection->getKeepAliveReliability()));
|
||||
|
||||
echoRequest("Hello", *connection);
|
||||
}
|
||||
|
||||
|
||||
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, ReconnectedWhenConnectionIsNearlyExpired)
|
||||
{
|
||||
auto ka = Poco::Timespan(1, 0); // 1 seconds
|
||||
timeouts.withHTTPKeepAliveTimeout(ka);
|
||||
|
||||
auto pool = getPool();
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
{
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
echoRequest("Hello", *connection);
|
||||
}
|
||||
|
||||
sleepForMilliseconds(900);
|
||||
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
echoRequest("Hello", *connection);
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, ServerOverwriteKeepAlive)
|
||||
{
|
||||
auto ka = Poco::Timespan(30, 0); // 30 seconds
|
||||
timeouts.withHTTPKeepAliveTimeout(ka);
|
||||
|
||||
auto pool = getPool();
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
echoRequest("Hello", *connection);
|
||||
ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds());
|
||||
ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds());
|
||||
}
|
||||
|
||||
{
|
||||
setOverWriteKeepAlive(1, 10);
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
echoRequest("Hello", *connection);
|
||||
ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds());
|
||||
ASSERT_EQ(1, connection->getKeepAliveTimeout().totalSeconds());
|
||||
}
|
||||
|
||||
{
|
||||
// server do not overwrite it in the following requests but client has to remember last agreed value
|
||||
setOverWriteKeepAlive(0, 0);
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
echoRequest("Hello", *connection);
|
||||
ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds());
|
||||
ASSERT_EQ(1, connection->getKeepAliveTimeout().totalSeconds());
|
||||
}
|
||||
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(3, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
|
||||
}
|
||||
|
||||
TEST_F(ConnectionPoolTest, MaxRequests)
|
||||
{
|
||||
auto ka = Poco::Timespan(30, 0); // 30 seconds
|
||||
timeouts.withHTTPKeepAliveTimeout(ka);
|
||||
auto max_requests = 5;
|
||||
timeouts.http_keep_alive_max_requests = max_requests;
|
||||
|
||||
auto pool = getPool();
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
for (int i = 1; i <= max_requests - 1; ++i)
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
echoRequest("Hello", *connection);
|
||||
ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds());
|
||||
ASSERT_EQ(max_requests, connection->getKeepAliveMaxRequests());
|
||||
ASSERT_EQ(i, connection->getKeepAliveRequest());
|
||||
}
|
||||
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(max_requests-1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(max_requests-2, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
|
||||
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
echoRequest("Hello", *connection);
|
||||
ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds());
|
||||
ASSERT_EQ(max_requests, connection->getKeepAliveMaxRequests());
|
||||
ASSERT_EQ(max_requests, connection->getKeepAliveRequest());
|
||||
}
|
||||
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(max_requests-1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(max_requests-1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
|
||||
}
|
||||
|
||||
|
||||
TEST_F(ConnectionPoolTest, ServerOverwriteMaxRequests)
|
||||
{
|
||||
auto ka = Poco::Timespan(30, 0); // 30 seconds
|
||||
timeouts.withHTTPKeepAliveTimeout(ka);
|
||||
|
||||
auto pool = getPool();
|
||||
auto metrics = pool->getMetrics();
|
||||
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
echoRequest("Hello", *connection);
|
||||
ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds());
|
||||
ASSERT_EQ(1000, connection->getKeepAliveMaxRequests());
|
||||
ASSERT_EQ(1, connection->getKeepAliveRequest());
|
||||
}
|
||||
|
||||
auto max_requests = 3;
|
||||
setOverWriteKeepAlive(5, max_requests);
|
||||
|
||||
for (int i = 2; i <= 10*max_requests; ++i)
|
||||
{
|
||||
auto connection = pool->getConnection(timeouts);
|
||||
echoRequest("Hello", *connection);
|
||||
ASSERT_EQ(5, connection->getKeepAliveTimeout().totalSeconds());
|
||||
ASSERT_EQ(max_requests, connection->getKeepAliveMaxRequests());
|
||||
ASSERT_EQ(((i-1) % max_requests) + 1, connection->getKeepAliveRequest());
|
||||
}
|
||||
|
||||
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||
ASSERT_EQ(10*max_requests-10, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||
ASSERT_EQ(10*max_requests-10, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
|
||||
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
|
||||
}
|
||||
|
@ -54,6 +54,7 @@ static constexpr auto DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT = 15;
|
||||
|
||||
static constexpr auto DEFAULT_TCP_KEEP_ALIVE_TIMEOUT = 290;
|
||||
static constexpr auto DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT = 30;
|
||||
static constexpr auto DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST = 1000;
|
||||
|
||||
static constexpr auto DBMS_DEFAULT_PATH = "/var/lib/clickhouse/";
|
||||
|
||||
|
@ -128,9 +128,9 @@ namespace DB
|
||||
M(Bool, format_alter_operations_with_parentheses, false, "If enabled, each operation in alter queries will be surrounded with parentheses in formatted queries to make them less ambiguous.", 0) \
|
||||
M(String, default_replica_path, "/clickhouse/tables/{uuid}/{shard}", "The path to the table in ZooKeeper", 0) \
|
||||
M(String, default_replica_name, "{replica}", "The replica name in ZooKeeper", 0) \
|
||||
M(UInt64, disk_connections_soft_limit, 1000, "Connections above this limit have significantly shorter time to live. The limit applies to the disks connections.", 0) \
|
||||
M(UInt64, disk_connections_soft_limit, 5000, "Connections above this limit have significantly shorter time to live. The limit applies to the disks connections.", 0) \
|
||||
M(UInt64, disk_connections_warn_limit, 10000, "Warning massages are written to the logs if number of in-use connections are higher than this limit. The limit applies to the disks connections.", 0) \
|
||||
M(UInt64, disk_connections_store_limit, 12000, "Connections above this limit reset after use. Set to 0 to turn connection cache off. The limit applies to the disks connections.", 0) \
|
||||
M(UInt64, disk_connections_store_limit, 30000, "Connections above this limit reset after use. Set to 0 to turn connection cache off. The limit applies to the disks connections.", 0) \
|
||||
M(UInt64, storage_connections_soft_limit, 100, "Connections above this limit have significantly shorter time to live. The limit applies to the storages connections.", 0) \
|
||||
M(UInt64, storage_connections_warn_limit, 1000, "Warning massages are written to the logs if number of in-use connections are higher than this limit. The limit applies to the storages connections.", 0) \
|
||||
M(UInt64, storage_connections_store_limit, 5000, "Connections above this limit reset after use. Set to 0 to turn connection cache off. The limit applies to the storages connections.", 0) \
|
||||
|
@ -76,6 +76,9 @@ std::unique_ptr<S3::Client> getClient(
|
||||
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", S3::DEFAULT_CONNECT_TIMEOUT_MS);
|
||||
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", S3::DEFAULT_REQUEST_TIMEOUT_MS);
|
||||
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", S3::DEFAULT_MAX_CONNECTIONS);
|
||||
client_configuration.http_keep_alive_timeout = config.getUInt(config_prefix + ".http_keep_alive_timeout", S3::DEFAULT_KEEP_ALIVE_TIMEOUT);
|
||||
client_configuration.http_keep_alive_max_requests = config.getUInt(config_prefix + ".http_keep_alive_max_requests", S3::DEFAULT_KEEP_ALIVE_MAX_REQUESTS);
|
||||
|
||||
client_configuration.endpointOverride = uri.endpoint;
|
||||
client_configuration.s3_use_adaptive_timeouts = config.getBool(
|
||||
config_prefix + ".use_adaptive_timeouts", client_configuration.s3_use_adaptive_timeouts);
|
||||
|
@ -144,7 +144,12 @@ ConnectionTimeouts ConnectionTimeouts::getAdaptiveTimeouts(const String & method
|
||||
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
|
||||
{
|
||||
session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
|
||||
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
|
||||
/// we can not change keep alive timeout for already initiated connections
|
||||
if (!session.connected())
|
||||
{
|
||||
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
|
||||
session.setKeepAliveMaxRequests(int(timeouts.http_keep_alive_max_requests));
|
||||
}
|
||||
}
|
||||
|
||||
ConnectionTimeouts getTimeouts(const Poco::Net::HTTPClientSession & session)
|
||||
|
@ -35,6 +35,8 @@ struct ConnectionTimeouts
|
||||
|
||||
Poco::Timespan tcp_keep_alive_timeout = Poco::Timespan(DEFAULT_TCP_KEEP_ALIVE_TIMEOUT, 0);
|
||||
Poco::Timespan http_keep_alive_timeout = Poco::Timespan(DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT, 0);
|
||||
size_t http_keep_alive_max_requests = DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST;
|
||||
|
||||
|
||||
/// Timeouts for HedgedConnections
|
||||
Poco::Timespan hedged_connection_timeout = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0);
|
||||
@ -69,6 +71,7 @@ APPLY_FOR_ALL_CONNECTION_TIMEOUT_MEMBERS(DECLARE_BUILDER_FOR_MEMBER)
|
||||
|
||||
ConnectionTimeouts & withConnectionTimeout(size_t seconds);
|
||||
ConnectionTimeouts & withConnectionTimeout(Poco::Timespan span);
|
||||
ConnectionTimeouts & withHTTPKeepAliveMaxRequests(size_t requests);
|
||||
};
|
||||
|
||||
/// NOLINTBEGIN(bugprone-macro-parentheses)
|
||||
@ -114,6 +117,12 @@ inline ConnectionTimeouts & ConnectionTimeouts::withConnectionTimeout(Poco::Time
|
||||
return *this;
|
||||
}
|
||||
|
||||
inline ConnectionTimeouts & ConnectionTimeouts::withHTTPKeepAliveMaxRequests(size_t requests)
|
||||
{
|
||||
http_keep_alive_max_requests = requests;
|
||||
return *this;
|
||||
}
|
||||
|
||||
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts);
|
||||
ConnectionTimeouts getTimeouts(const Poco::Net::HTTPClientSession & session);
|
||||
|
||||
|
@ -96,9 +96,9 @@ bool isS3ExpressEndpoint(const std::string & endpoint);
|
||||
|
||||
struct ClientSettings
|
||||
{
|
||||
bool use_virtual_addressing;
|
||||
bool use_virtual_addressing = false;
|
||||
/// Disable checksum to avoid extra read of the input stream
|
||||
bool disable_checksum;
|
||||
bool disable_checksum = false;
|
||||
/// Should client send ComposeObject request after upload to GCS.
|
||||
///
|
||||
/// Previously ComposeObject request was required to make Copy possible,
|
||||
@ -108,8 +108,8 @@ struct ClientSettings
|
||||
///
|
||||
/// Ability to enable it preserved since likely it is required for old
|
||||
/// files.
|
||||
bool gcs_issue_compose_request;
|
||||
bool is_s3express_bucket;
|
||||
bool gcs_issue_compose_request = false;
|
||||
bool is_s3express_bucket = false;
|
||||
};
|
||||
|
||||
/// Client that improves the client from the AWS SDK
|
||||
|
@ -22,6 +22,8 @@ inline static constexpr uint64_t DEFAULT_EXPIRATION_WINDOW_SECONDS = 120;
|
||||
inline static constexpr uint64_t DEFAULT_CONNECT_TIMEOUT_MS = 1000;
|
||||
inline static constexpr uint64_t DEFAULT_REQUEST_TIMEOUT_MS = 30000;
|
||||
inline static constexpr uint64_t DEFAULT_MAX_CONNECTIONS = 100;
|
||||
inline static constexpr uint64_t DEFAULT_KEEP_ALIVE_TIMEOUT = 5;
|
||||
inline static constexpr uint64_t DEFAULT_KEEP_ALIVE_MAX_REQUESTS = 100;
|
||||
|
||||
/// In GCP metadata service can be accessed via DNS regardless of IPv4 or IPv6.
|
||||
static inline constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.google.internal";
|
||||
|
@ -146,7 +146,9 @@ ConnectionTimeouts getTimeoutsFromConfiguration(const PocoHTTPClientConfiguratio
|
||||
.withSendTimeout(Poco::Timespan(client_configuration.requestTimeoutMs * 1000))
|
||||
.withReceiveTimeout(Poco::Timespan(client_configuration.requestTimeoutMs * 1000))
|
||||
.withTCPKeepAliveTimeout(Poco::Timespan(
|
||||
client_configuration.enableTcpKeepAlive ? client_configuration.tcpKeepAliveIntervalMs * 1000 : 0));
|
||||
client_configuration.enableTcpKeepAlive ? client_configuration.tcpKeepAliveIntervalMs * 1000 : 0))
|
||||
.withHTTPKeepAliveTimeout(Poco::Timespan(client_configuration.http_keep_alive_timeout, 0))
|
||||
.withHTTPKeepAliveMaxRequests(client_configuration.http_keep_alive_max_requests);
|
||||
}
|
||||
|
||||
PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_configuration)
|
||||
|
@ -51,6 +51,8 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
|
||||
|
||||
/// See PoolBase::BehaviourOnLimit
|
||||
bool s3_use_adaptive_timeouts = true;
|
||||
size_t http_keep_alive_timeout = DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT;
|
||||
size_t http_keep_alive_max_requests = DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST;
|
||||
|
||||
std::function<void(const DB::ProxyConfiguration &)> error_report;
|
||||
|
||||
|
@ -159,7 +159,7 @@ void testServerSideEncryption(
|
||||
DB::S3::CredentialsConfiguration
|
||||
{
|
||||
.use_environment_credentials = use_environment_credentials,
|
||||
.use_insecure_imds_request = use_insecure_imds_request
|
||||
.use_insecure_imds_request = use_insecure_imds_request,
|
||||
}
|
||||
);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user