mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 00:30:49 +00:00
support max requests for keep alive
This commit is contained in:
parent
5cab8d185f
commit
ae3a199939
@ -213,6 +213,12 @@ namespace Net
|
|||||||
Poco::Timespan getKeepAliveTimeout() const;
|
Poco::Timespan getKeepAliveTimeout() const;
|
||||||
/// Returns the connection timeout for HTTP connections.
|
/// Returns the connection timeout for HTTP connections.
|
||||||
|
|
||||||
|
void setKeepAliveMaxRequests(int max_requests);
|
||||||
|
|
||||||
|
int getKeepAliveMaxRequests() const;
|
||||||
|
|
||||||
|
int getKeepAliveRequest() const;
|
||||||
|
|
||||||
bool isKeepAliveExpired(double reliability = 1.0) const;
|
bool isKeepAliveExpired(double reliability = 1.0) const;
|
||||||
/// Returns if the connection is expired with some margin as fraction of timeout as reliability
|
/// Returns if the connection is expired with some margin as fraction of timeout as reliability
|
||||||
|
|
||||||
@ -352,6 +358,8 @@ namespace Net
|
|||||||
|
|
||||||
void assign(HTTPClientSession & session);
|
void assign(HTTPClientSession & session);
|
||||||
|
|
||||||
|
void setKeepAliveRequest(int request);
|
||||||
|
|
||||||
HTTPSessionFactory _proxySessionFactory;
|
HTTPSessionFactory _proxySessionFactory;
|
||||||
/// Factory to create HTTPClientSession to proxy.
|
/// Factory to create HTTPClientSession to proxy.
|
||||||
private:
|
private:
|
||||||
@ -360,6 +368,8 @@ namespace Net
|
|||||||
Poco::UInt16 _port;
|
Poco::UInt16 _port;
|
||||||
ProxyConfig _proxyConfig;
|
ProxyConfig _proxyConfig;
|
||||||
Poco::Timespan _keepAliveTimeout;
|
Poco::Timespan _keepAliveTimeout;
|
||||||
|
int _keepAliveCurrentRequest = 0;
|
||||||
|
int _keepAliveMaxRequests = 1000;
|
||||||
Poco::Timestamp _lastRequest;
|
Poco::Timestamp _lastRequest;
|
||||||
bool _reconnect;
|
bool _reconnect;
|
||||||
bool _mustReconnect;
|
bool _mustReconnect;
|
||||||
@ -463,6 +473,16 @@ namespace Net
|
|||||||
return _defaultKeepAliveReliabilityLevel;
|
return _defaultKeepAliveReliabilityLevel;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
inline int HTTPClientSession::getKeepAliveMaxRequests() const
|
||||||
|
{
|
||||||
|
return _keepAliveMaxRequests;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline int HTTPClientSession::getKeepAliveRequest() const
|
||||||
|
{
|
||||||
|
return _keepAliveCurrentRequest;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
} // namespace Poco::Net
|
} // namespace Poco::Net
|
||||||
|
|
||||||
|
@ -120,8 +120,9 @@ namespace Net
|
|||||||
/// The value is set to "Keep-Alive" if keepAlive is
|
/// The value is set to "Keep-Alive" if keepAlive is
|
||||||
/// true, or to "Close" otherwise.
|
/// true, or to "Close" otherwise.
|
||||||
|
|
||||||
void setKeepAliveTimeout(int timeout);
|
void setKeepAliveTimeout(int timeout, int max_requests);
|
||||||
int getKeepAliveTimeout() const;
|
int getKeepAliveTimeout() const;
|
||||||
|
int getKeepAliveMaxRequests() const;
|
||||||
|
|
||||||
bool getKeepAlive() const;
|
bool getKeepAlive() const;
|
||||||
/// Returns true if
|
/// Returns true if
|
||||||
|
@ -230,7 +230,25 @@ void HTTPClientSession::setKeepAliveTimeout(const Poco::Timespan& timeout)
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void HTTPClientSession::setLastRequest(Poco::Timestamp time)
|
void HTTPClientSession::setKeepAliveMaxRequests(int max_requests)
|
||||||
|
{
|
||||||
|
if (connected())
|
||||||
|
{
|
||||||
|
throw Poco::IllegalStateException("cannot change keep alive max requests on initiated connection, "
|
||||||
|
"That value is managed privately after connection is established.");
|
||||||
|
}
|
||||||
|
_keepAliveMaxRequests = max_requests;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void HTTPClientSession::setKeepAliveRequest(int request)
|
||||||
|
{
|
||||||
|
_keepAliveCurrentRequest = request;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
void HTTPClientSession::setLastRequest(Poco::Timestamp time)
|
||||||
{
|
{
|
||||||
if (connected())
|
if (connected())
|
||||||
{
|
{
|
||||||
@ -248,6 +266,8 @@ std::ostream& HTTPClientSession::sendRequest(HTTPRequest& request)
|
|||||||
clearException();
|
clearException();
|
||||||
_responseReceived = false;
|
_responseReceived = false;
|
||||||
|
|
||||||
|
_keepAliveCurrentRequest += 1;
|
||||||
|
|
||||||
bool keepAlive = getKeepAlive();
|
bool keepAlive = getKeepAlive();
|
||||||
if (((connected() && !keepAlive) || mustReconnect()) && !_host.empty())
|
if (((connected() && !keepAlive) || mustReconnect()) && !_host.empty())
|
||||||
{
|
{
|
||||||
@ -261,7 +281,7 @@ std::ostream& HTTPClientSession::sendRequest(HTTPRequest& request)
|
|||||||
if (!request.has(HTTPMessage::CONNECTION))
|
if (!request.has(HTTPMessage::CONNECTION))
|
||||||
request.setKeepAlive(keepAlive);
|
request.setKeepAlive(keepAlive);
|
||||||
if (keepAlive && !request.has(HTTPMessage::CONNECTION_KEEP_ALIVE) && _keepAliveTimeout.totalSeconds() > 0)
|
if (keepAlive && !request.has(HTTPMessage::CONNECTION_KEEP_ALIVE) && _keepAliveTimeout.totalSeconds() > 0)
|
||||||
request.setKeepAliveTimeout(_keepAliveTimeout.totalSeconds());
|
request.setKeepAliveTimeout(_keepAliveTimeout.totalSeconds(), _keepAliveMaxRequests);
|
||||||
if (!request.has(HTTPRequest::HOST) && !_host.empty())
|
if (!request.has(HTTPRequest::HOST) && !_host.empty())
|
||||||
request.setHost(_host, _port);
|
request.setHost(_host, _port);
|
||||||
if (!_proxyConfig.host.empty() && !bypassProxy())
|
if (!_proxyConfig.host.empty() && !bypassProxy())
|
||||||
@ -349,6 +369,9 @@ std::istream& HTTPClientSession::receiveResponse(HTTPResponse& response)
|
|||||||
auto timeout = response.getKeepAliveTimeout();
|
auto timeout = response.getKeepAliveTimeout();
|
||||||
if (timeout > 0)
|
if (timeout > 0)
|
||||||
_keepAliveTimeout = std::min(_keepAliveTimeout, Poco::Timespan(timeout, 0));
|
_keepAliveTimeout = std::min(_keepAliveTimeout, Poco::Timespan(timeout, 0));
|
||||||
|
auto max_requests = response.getKeepAliveMaxRequests();
|
||||||
|
if (max_requests > 0)
|
||||||
|
_keepAliveMaxRequests = std::min(_keepAliveMaxRequests, max_requests);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!_expectResponseBody || response.getStatus() < 200 || response.getStatus() == HTTPResponse::HTTP_NO_CONTENT || response.getStatus() == HTTPResponse::HTTP_NOT_MODIFIED)
|
if (!_expectResponseBody || response.getStatus() < 200 || response.getStatus() == HTTPResponse::HTTP_NO_CONTENT || response.getStatus() == HTTPResponse::HTTP_NOT_MODIFIED)
|
||||||
@ -460,7 +483,8 @@ std::string HTTPClientSession::proxyRequestPrefix() const
|
|||||||
bool HTTPClientSession::isKeepAliveExpired(double reliability) const
|
bool HTTPClientSession::isKeepAliveExpired(double reliability) const
|
||||||
{
|
{
|
||||||
Poco::Timestamp now;
|
Poco::Timestamp now;
|
||||||
return Timespan(Timestamp::TimeDiff(reliability *_keepAliveTimeout.totalMicroseconds())) <= now - _lastRequest;
|
return Timespan(Timestamp::TimeDiff(reliability *_keepAliveTimeout.totalMicroseconds())) <= now - _lastRequest
|
||||||
|
|| _keepAliveCurrentRequest > _keepAliveMaxRequests;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool HTTPClientSession::mustReconnect() const
|
bool HTTPClientSession::mustReconnect() const
|
||||||
@ -551,6 +575,9 @@ void HTTPClientSession::assign(Poco::Net::HTTPClientSession & session)
|
|||||||
setLastRequest(session.getLastRequest());
|
setLastRequest(session.getLastRequest());
|
||||||
setKeepAliveTimeout(session.getKeepAliveTimeout());
|
setKeepAliveTimeout(session.getKeepAliveTimeout());
|
||||||
|
|
||||||
|
_keepAliveMaxRequests = session._keepAliveMaxRequests;
|
||||||
|
_keepAliveCurrentRequest = session._keepAliveCurrentRequest;
|
||||||
|
|
||||||
attachSocket(session.detachSocket());
|
attachSocket(session.detachSocket());
|
||||||
|
|
||||||
session.reset();
|
session.reset();
|
||||||
|
@ -180,27 +180,25 @@ bool HTTPMessage::getKeepAlive() const
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void HTTPMessage::setKeepAliveTimeout(int timeout)
|
void HTTPMessage::setKeepAliveTimeout(int timeout, int max_requests)
|
||||||
{
|
{
|
||||||
add(HTTPMessage::CONNECTION_KEEP_ALIVE, std::format("timeout={}, max=1000", timeout));
|
add(HTTPMessage::CONNECTION_KEEP_ALIVE, std::format("timeout={}, max={}", timeout, max_requests));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int parseTimeoutFromHeaderValue(const std::string_view header_value)
|
int parseFromHeaderValues(const std::string_view header_value, const std::string_view param_name)
|
||||||
{
|
{
|
||||||
static const std::string_view timeout_param = "timeout=";
|
auto param_value_pos = header_value.find(param_name);
|
||||||
|
if (param_value_pos == std::string::npos)
|
||||||
|
param_value_pos = header_value.size();
|
||||||
|
if (param_value_pos != header_value.size())
|
||||||
|
param_value_pos += param_name.size();
|
||||||
|
|
||||||
auto timeout_pos = header_value.find(timeout_param);
|
auto param_value_end = header_value.find(',', param_value_pos);
|
||||||
if (timeout_pos == std::string::npos)
|
if (param_value_end == std::string::npos)
|
||||||
timeout_pos = header_value.size();
|
param_value_end = header_value.size();
|
||||||
if (timeout_pos != header_value.size())
|
|
||||||
timeout_pos += timeout_param.size();
|
|
||||||
|
|
||||||
auto timeout_end = header_value.find(',', timeout_pos);
|
auto timeout_value_substr = header_value.substr(param_value_pos, param_value_end - param_value_pos);
|
||||||
if (timeout_end == std::string::npos)
|
|
||||||
timeout_end = header_value.size();
|
|
||||||
|
|
||||||
auto timeout_value_substr = header_value.substr(timeout_pos, timeout_end - timeout_pos);
|
|
||||||
if (timeout_value_substr.empty())
|
if (timeout_value_substr.empty())
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
@ -217,7 +215,16 @@ int parseTimeoutFromHeaderValue(const std::string_view header_value)
|
|||||||
int HTTPMessage::getKeepAliveTimeout() const
|
int HTTPMessage::getKeepAliveTimeout() const
|
||||||
{
|
{
|
||||||
const std::string& ka_header = get(HTTPMessage::CONNECTION_KEEP_ALIVE, HTTPMessage::EMPTY);
|
const std::string& ka_header = get(HTTPMessage::CONNECTION_KEEP_ALIVE, HTTPMessage::EMPTY);
|
||||||
return parseTimeoutFromHeaderValue(ka_header);
|
static const std::string_view timeout_param = "timeout=";
|
||||||
|
return parseFromHeaderValues(ka_header, timeout_param);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int HTTPMessage::getKeepAliveMaxRequests() const
|
||||||
|
{
|
||||||
|
const std::string& ka_header = get(HTTPMessage::CONNECTION_KEEP_ALIVE, HTTPMessage::EMPTY);
|
||||||
|
static const std::string_view timeout_param = "max=";
|
||||||
|
return parseFromHeaderValues(ka_header, timeout_param);
|
||||||
}
|
}
|
||||||
|
|
||||||
} } // namespace Poco::Net
|
} } // namespace Poco::Net
|
||||||
|
@ -301,6 +301,8 @@ private:
|
|||||||
auto timeouts = getTimeouts(*this);
|
auto timeouts = getTimeouts(*this);
|
||||||
auto new_connection = lock->getConnection(timeouts);
|
auto new_connection = lock->getConnection(timeouts);
|
||||||
Session::assign(*new_connection);
|
Session::assign(*new_connection);
|
||||||
|
if (Session::getKeepAliveRequest() == 0)
|
||||||
|
Session::setKeepAliveRequest(1);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -322,7 +324,8 @@ private:
|
|||||||
Session::getPort());
|
Session::getPort());
|
||||||
}
|
}
|
||||||
|
|
||||||
Poco::Timespan idleTime() {
|
Poco::Timespan idleTime()
|
||||||
|
{
|
||||||
Poco::Timestamp now;
|
Poco::Timestamp now;
|
||||||
return now - Session::getLastRequest();
|
return now - Session::getLastRequest();
|
||||||
}
|
}
|
||||||
@ -374,11 +377,11 @@ private:
|
|||||||
result.exceptions(std::ios::badbit);
|
result.exceptions(std::ios::badbit);
|
||||||
|
|
||||||
// that line is for temporary debug, will be removed
|
// that line is for temporary debug, will be removed
|
||||||
LOG_INFO(log, "Send request to {} with: version {}, method {}, usage count {}, keep-alive timeout={}, last usage ago: {}ms, headers: {}",
|
LOG_INFO(log, "Send request to {} with: version {}, method {}, request no {}, keep-alive timeout={}, last usage ago: {}ms, headers: {}",
|
||||||
request.getVersion(),
|
request.getVersion(),
|
||||||
request.getMethod(),
|
request.getMethod(),
|
||||||
getTarget(),
|
getTarget(),
|
||||||
usage_cnt,
|
Session::getKeepAliveRequest(),
|
||||||
Session::getKeepAliveTimeout().totalSeconds(),
|
Session::getKeepAliveTimeout().totalSeconds(),
|
||||||
idle.totalMilliseconds(),
|
idle.totalMilliseconds(),
|
||||||
printAllHeaders(request));
|
printAllHeaders(request));
|
||||||
@ -400,11 +403,11 @@ private:
|
|||||||
result.exceptions(std::ios::badbit);
|
result.exceptions(std::ios::badbit);
|
||||||
|
|
||||||
// that line is for temporary debug, will be removed
|
// that line is for temporary debug, will be removed
|
||||||
LOG_INFO(log, "Received response from {} with: version {}, code {}, usage count {}, keep alive header: {}, original ka {}, last usage ago: {}ms, headers: {}",
|
LOG_INFO(log, "Received response from {} with: version {}, code {}, request no {}, keep alive header: {}, original ka {}, last usage ago: {}ms, headers: {}",
|
||||||
getTarget(),
|
getTarget(),
|
||||||
response.getVersion(),
|
response.getVersion(),
|
||||||
int(response.getStatus()),
|
int(response.getStatus()),
|
||||||
usage_cnt,
|
Session::getKeepAliveRequest(),
|
||||||
response.get(Poco::Net::HTTPMessage::CONNECTION_KEEP_ALIVE, Poco::Net::HTTPMessage::EMPTY),
|
response.get(Poco::Net::HTTPMessage::CONNECTION_KEEP_ALIVE, Poco::Net::HTTPMessage::EMPTY),
|
||||||
originKA,
|
originKA,
|
||||||
idleTime().totalMilliseconds(),
|
idleTime().totalMilliseconds(),
|
||||||
@ -460,9 +463,9 @@ private:
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
Poco::Timestamp now;
|
Poco::Timestamp now;
|
||||||
LOG_INFO(log, "Expired connection to {} with: usage count {}, keep alive timeout: {}, last usage ago: {}s",
|
LOG_INFO(log, "Expired connection to {} with: request no {}, keep alive timeout: {}, last usage ago: {}s",
|
||||||
getTarget(),
|
getTarget(),
|
||||||
usage_cnt,
|
Session::getKeepAliveRequest(),
|
||||||
Session::getKeepAliveTimeout().totalSeconds(),
|
Session::getKeepAliveTimeout().totalSeconds(),
|
||||||
idleTime().totalSeconds());
|
idleTime().totalSeconds());
|
||||||
}
|
}
|
||||||
@ -474,8 +477,15 @@ private:
|
|||||||
friend class EndpointConnectionPool;
|
friend class EndpointConnectionPool;
|
||||||
|
|
||||||
template <class... Args>
|
template <class... Args>
|
||||||
explicit PooledConnection(EndpointConnectionPool::WeakPtr pool_, ConnectionGroup::Ptr group_, IHTTPConnectionPoolForEndpoint::Metrics metrics_, Args &&... args)
|
explicit PooledConnection(
|
||||||
: Session(args...), pool(std::move(pool_)), group(group_), metrics(std::move(metrics_))
|
EndpointConnectionPool::WeakPtr pool_,
|
||||||
|
ConnectionGroup::Ptr group_,
|
||||||
|
IHTTPConnectionPoolForEndpoint::Metrics metrics_,
|
||||||
|
Args &&... args)
|
||||||
|
: Session(args...)
|
||||||
|
, pool(std::move(pool_))
|
||||||
|
, group(group_)
|
||||||
|
, metrics(std::move(metrics_))
|
||||||
{
|
{
|
||||||
CurrentMetrics::add(metrics.active_count);
|
CurrentMetrics::add(metrics.active_count);
|
||||||
group->atConnectionCreate();
|
group->atConnectionCreate();
|
||||||
@ -508,7 +518,7 @@ private:
|
|||||||
ConnectionGroup::Ptr group;
|
ConnectionGroup::Ptr group;
|
||||||
IHTTPConnectionPoolForEndpoint::Metrics metrics;
|
IHTTPConnectionPoolForEndpoint::Metrics metrics;
|
||||||
bool isExpired = false;
|
bool isExpired = false;
|
||||||
size_t usage_cnt = 1;
|
|
||||||
size_t exception_level = std::uncaught_exceptions();
|
size_t exception_level = std::uncaught_exceptions();
|
||||||
|
|
||||||
LoggerPtr log = getLogger("PooledConnection");
|
LoggerPtr log = getLogger("PooledConnection");
|
||||||
@ -578,7 +588,6 @@ public:
|
|||||||
stored_connections.pop();
|
stored_connections.pop();
|
||||||
|
|
||||||
setTimeouts(*it, timeouts);
|
setTimeouts(*it, timeouts);
|
||||||
it->usage_cnt += 1;
|
|
||||||
|
|
||||||
ProfileEvents::increment(getMetrics().reused, 1);
|
ProfileEvents::increment(getMetrics().reused, 1);
|
||||||
CurrentMetrics::sub(getMetrics().stored_count, 1);
|
CurrentMetrics::sub(getMetrics().stored_count, 1);
|
||||||
@ -655,47 +664,50 @@ private:
|
|||||||
return connection->isKeepAliveExpired(0.8);
|
return connection->isKeepAliveExpired(0.8);
|
||||||
}
|
}
|
||||||
|
|
||||||
ConnectionPtr allocateNewConnection()
|
|
||||||
|
ConnectionPtr prepareNewConnection(const ConnectionTimeouts & timeouts)
|
||||||
{
|
{
|
||||||
ConnectionPtr connection = PooledConnection::create(this->getWeakFromThis(), group, getMetrics(), host, port);
|
auto connection = PooledConnection::create(this->getWeakFromThis(), group, getMetrics(), host, port);
|
||||||
|
|
||||||
connection->setKeepAlive(true);
|
connection->setKeepAlive(true);
|
||||||
|
setTimeouts(*connection, timeouts);
|
||||||
|
|
||||||
if (!proxy_configuration.isEmpty())
|
if (!proxy_configuration.isEmpty())
|
||||||
{
|
{
|
||||||
connection->setProxyConfig(proxyConfigurationToPocoProxyConfig(proxy_configuration));
|
connection->setProxyConfig(proxyConfigurationToPocoProxyConfig(proxy_configuration));
|
||||||
}
|
}
|
||||||
|
|
||||||
return connection;
|
|
||||||
}
|
|
||||||
|
|
||||||
ConnectionPtr prepareNewConnection(const ConnectionTimeouts & timeouts)
|
|
||||||
{
|
|
||||||
auto address = HostResolversPool::instance().getResolver(host)->resolve();
|
auto address = HostResolversPool::instance().getResolver(host)->resolve();
|
||||||
|
connection->setResolvedHost(*address);
|
||||||
auto session = allocateNewConnection();
|
|
||||||
|
|
||||||
setTimeouts(*session, timeouts);
|
|
||||||
session->setResolvedHost(*address);
|
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
auto timer = CurrentThread::getProfileEvents().timer(getMetrics().elapsed_microseconds);
|
auto timer = CurrentThread::getProfileEvents().timer(getMetrics().elapsed_microseconds);
|
||||||
session->doConnect();
|
connection->doConnect();
|
||||||
}
|
}
|
||||||
catch (...)
|
catch (...)
|
||||||
{
|
{
|
||||||
address.setFail();
|
address.setFail();
|
||||||
ProfileEvents::increment(getMetrics().errors);
|
ProfileEvents::increment(getMetrics().errors);
|
||||||
session->reset();
|
connection->reset();
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
ProfileEvents::increment(getMetrics().created);
|
ProfileEvents::increment(getMetrics().created);
|
||||||
return session;
|
return connection;
|
||||||
}
|
}
|
||||||
|
|
||||||
void atConnectionDestroy(PooledConnection & connection)
|
void atConnectionDestroy(PooledConnection & connection)
|
||||||
{
|
{
|
||||||
|
if (connection.getKeepAliveRequest() >= connection.getKeepAliveMaxRequests())
|
||||||
|
{
|
||||||
|
LOG_INFO(getLogger("PooledConnection"), "Expired by connection number {}",
|
||||||
|
connection.getKeepAliveRequest());
|
||||||
|
|
||||||
|
ProfileEvents::increment(getMetrics().expired, 1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!connection.connected() || connection.mustReconnect() || !connection.isCompleted() || connection.buffered()
|
if (!connection.connected() || connection.mustReconnect() || !connection.isCompleted() || connection.buffered()
|
||||||
|| group->isStoreLimitReached())
|
|| group->isStoreLimitReached())
|
||||||
{
|
{
|
||||||
@ -703,7 +715,7 @@ private:
|
|||||||
LOG_INFO(getLogger("PooledConnection"),
|
LOG_INFO(getLogger("PooledConnection"),
|
||||||
"Reset connection to {} with: usage count {}, keep alive timeout: {}, connected {}, must recon {}, last usage ago: {}, is completed {}, store limit reached {} as {}/{}, there is exception {}",
|
"Reset connection to {} with: usage count {}, keep alive timeout: {}, connected {}, must recon {}, last usage ago: {}, is completed {}, store limit reached {} as {}/{}, there is exception {}",
|
||||||
getTarget(),
|
getTarget(),
|
||||||
connection.usage_cnt,
|
connection.getKeepAliveRequest(),
|
||||||
connection.getKeepAliveTimeout().totalSeconds(),
|
connection.getKeepAliveTimeout().totalSeconds(),
|
||||||
connection.connected(),
|
connection.connected(),
|
||||||
connection.mustReconnect(),
|
connection.mustReconnect(),
|
||||||
@ -716,9 +728,8 @@ private:
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto connection_to_store = allocateNewConnection();
|
auto connection_to_store = PooledConnection::create(this->getWeakFromThis(), group, getMetrics(), host, port);
|
||||||
connection_to_store->assign(connection);
|
connection_to_store->assign(connection);
|
||||||
connection_to_store->usage_cnt = connection.usage_cnt;
|
|
||||||
|
|
||||||
{
|
{
|
||||||
MemoryTrackerSwitcher switcher{&total_memory_tracker};
|
MemoryTrackerSwitcher switcher{&total_memory_tracker};
|
||||||
|
@ -47,6 +47,7 @@ struct RequestOptions
|
|||||||
{
|
{
|
||||||
size_t slowdown_receive = 0;
|
size_t slowdown_receive = 0;
|
||||||
int overwrite_keep_alive_timeout = 0;
|
int overwrite_keep_alive_timeout = 0;
|
||||||
|
int overwrite_keep_alive_max_requests = 10;
|
||||||
};
|
};
|
||||||
|
|
||||||
size_t stream_copy_n(std::istream & in, std::ostream & out, std::size_t count = std::numeric_limits<size_t>::max())
|
size_t stream_copy_n(std::istream & in, std::ostream & out, std::size_t count = std::numeric_limits<size_t>::max())
|
||||||
@ -89,8 +90,10 @@ public:
|
|||||||
int value = request.getKeepAliveTimeout();
|
int value = request.getKeepAliveTimeout();
|
||||||
ASSERT_GT(value, 0);
|
ASSERT_GT(value, 0);
|
||||||
|
|
||||||
if (options->get().overwrite_keep_alive_timeout > 0)
|
auto params = options->get();
|
||||||
response.setKeepAliveTimeout(options->get().overwrite_keep_alive_timeout);
|
|
||||||
|
if (params.overwrite_keep_alive_timeout > 0)
|
||||||
|
response.setKeepAliveTimeout(params.overwrite_keep_alive_timeout, params.overwrite_keep_alive_max_requests);
|
||||||
|
|
||||||
response.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
|
response.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
|
||||||
auto size = request.getContentLength();
|
auto size = request.getContentLength();
|
||||||
@ -99,8 +102,8 @@ public:
|
|||||||
else
|
else
|
||||||
response.setChunkedTransferEncoding(true); // or chunk encoding
|
response.setChunkedTransferEncoding(true); // or chunk encoding
|
||||||
|
|
||||||
if (options->get().slowdown_receive > 0)
|
if (params.slowdown_receive > 0)
|
||||||
sleepForSeconds(options->get().slowdown_receive);
|
sleepForSeconds(params.slowdown_receive);
|
||||||
|
|
||||||
stream_copy_n(request.stream(), response.send(), size);
|
stream_copy_n(request.stream(), response.send(), size);
|
||||||
}
|
}
|
||||||
@ -189,10 +192,11 @@ protected:
|
|||||||
options->set(std::move(opt));
|
options->set(std::move(opt));
|
||||||
}
|
}
|
||||||
|
|
||||||
void setOverWriteTimeout(size_t seconds)
|
void setOverWriteKeepAlive(size_t seconds, int max_requests)
|
||||||
{
|
{
|
||||||
auto opt = options->get();
|
auto opt = options->get();
|
||||||
opt.overwrite_keep_alive_timeout = int(seconds);
|
opt.overwrite_keep_alive_timeout = int(seconds);
|
||||||
|
opt.overwrite_keep_alive_max_requests= max_requests;
|
||||||
options->set(std::move(opt));
|
options->set(std::move(opt));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -794,7 +798,7 @@ TEST_F(ConnectionPoolTest, ServerOverwriteKeepAlive)
|
|||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
setOverWriteTimeout(1);
|
setOverWriteKeepAlive(1, 10);
|
||||||
auto connection = pool->getConnection(timeouts);
|
auto connection = pool->getConnection(timeouts);
|
||||||
echoRequest("Hello", *connection);
|
echoRequest("Hello", *connection);
|
||||||
ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds());
|
ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds());
|
||||||
@ -803,7 +807,7 @@ TEST_F(ConnectionPoolTest, ServerOverwriteKeepAlive)
|
|||||||
|
|
||||||
{
|
{
|
||||||
// server do not overwrite it in the following requests but client has to remember last agreed value
|
// server do not overwrite it in the following requests but client has to remember last agreed value
|
||||||
setOverWriteTimeout(0);
|
setOverWriteKeepAlive(0, 0);
|
||||||
auto connection = pool->getConnection(timeouts);
|
auto connection = pool->getConnection(timeouts);
|
||||||
echoRequest("Hello", *connection);
|
echoRequest("Hello", *connection);
|
||||||
ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds());
|
ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds());
|
||||||
@ -819,3 +823,88 @@ TEST_F(ConnectionPoolTest, ServerOverwriteKeepAlive)
|
|||||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
|
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
|
||||||
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
|
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ConnectionPoolTest, MaxRequests)
|
||||||
|
{
|
||||||
|
auto ka = Poco::Timespan(30, 0); // 30 seconds
|
||||||
|
timeouts.withHTTPKeepAliveTimeout(ka);
|
||||||
|
auto max_requests = 5;
|
||||||
|
timeouts.http_keep_alive_max_requests = max_requests;
|
||||||
|
|
||||||
|
auto pool = getPool();
|
||||||
|
auto metrics = pool->getMetrics();
|
||||||
|
|
||||||
|
for (int i = 1; i <= max_requests - 1; ++i)
|
||||||
|
{
|
||||||
|
auto connection = pool->getConnection(timeouts);
|
||||||
|
echoRequest("Hello", *connection);
|
||||||
|
ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds());
|
||||||
|
ASSERT_EQ(max_requests, connection->getKeepAliveMaxRequests());
|
||||||
|
ASSERT_EQ(i, connection->getKeepAliveRequest());
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||||
|
ASSERT_EQ(max_requests-1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||||
|
ASSERT_EQ(max_requests-2, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||||
|
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||||
|
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||||
|
|
||||||
|
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
|
||||||
|
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
|
||||||
|
|
||||||
|
{
|
||||||
|
auto connection = pool->getConnection(timeouts);
|
||||||
|
echoRequest("Hello", *connection);
|
||||||
|
ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds());
|
||||||
|
ASSERT_EQ(max_requests, connection->getKeepAliveMaxRequests());
|
||||||
|
ASSERT_EQ(max_requests, connection->getKeepAliveRequest());
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||||
|
ASSERT_EQ(max_requests-1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||||
|
ASSERT_EQ(max_requests-1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||||
|
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||||
|
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||||
|
|
||||||
|
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
|
||||||
|
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
TEST_F(ConnectionPoolTest, ServerOverwriteMaxRequests)
|
||||||
|
{
|
||||||
|
auto ka = Poco::Timespan(30, 0); // 30 seconds
|
||||||
|
timeouts.withHTTPKeepAliveTimeout(ka);
|
||||||
|
|
||||||
|
auto pool = getPool();
|
||||||
|
auto metrics = pool->getMetrics();
|
||||||
|
|
||||||
|
{
|
||||||
|
auto connection = pool->getConnection(timeouts);
|
||||||
|
echoRequest("Hello", *connection);
|
||||||
|
ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds());
|
||||||
|
ASSERT_EQ(1000, connection->getKeepAliveMaxRequests());
|
||||||
|
ASSERT_EQ(1, connection->getKeepAliveRequest());
|
||||||
|
}
|
||||||
|
|
||||||
|
auto max_requests = 3;
|
||||||
|
setOverWriteKeepAlive(5, max_requests);
|
||||||
|
|
||||||
|
for (int i = 2; i <= 10*max_requests; ++i)
|
||||||
|
{
|
||||||
|
auto connection = pool->getConnection(timeouts);
|
||||||
|
echoRequest("Hello", *connection);
|
||||||
|
ASSERT_EQ(5, connection->getKeepAliveTimeout().totalSeconds());
|
||||||
|
ASSERT_EQ(max_requests, connection->getKeepAliveMaxRequests());
|
||||||
|
ASSERT_EQ(((i-1) % max_requests) + 1, connection->getKeepAliveRequest());
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.created]);
|
||||||
|
ASSERT_EQ(10*max_requests-10, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
|
||||||
|
ASSERT_EQ(10*max_requests-10, DB::CurrentThread::getProfileEvents()[metrics.reused]);
|
||||||
|
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
|
||||||
|
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.expired]);
|
||||||
|
|
||||||
|
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
|
||||||
|
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
|
||||||
|
}
|
||||||
|
@ -54,6 +54,7 @@ static constexpr auto DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT = 15;
|
|||||||
|
|
||||||
static constexpr auto DEFAULT_TCP_KEEP_ALIVE_TIMEOUT = 290;
|
static constexpr auto DEFAULT_TCP_KEEP_ALIVE_TIMEOUT = 290;
|
||||||
static constexpr auto DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT = 30;
|
static constexpr auto DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT = 30;
|
||||||
|
static constexpr auto DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST = 1000;
|
||||||
|
|
||||||
static constexpr auto DBMS_DEFAULT_PATH = "/var/lib/clickhouse/";
|
static constexpr auto DBMS_DEFAULT_PATH = "/var/lib/clickhouse/";
|
||||||
|
|
||||||
|
@ -76,7 +76,8 @@ std::unique_ptr<S3::Client> getClient(
|
|||||||
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", S3::DEFAULT_CONNECT_TIMEOUT_MS);
|
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", S3::DEFAULT_CONNECT_TIMEOUT_MS);
|
||||||
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", S3::DEFAULT_REQUEST_TIMEOUT_MS);
|
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", S3::DEFAULT_REQUEST_TIMEOUT_MS);
|
||||||
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", S3::DEFAULT_MAX_CONNECTIONS);
|
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", S3::DEFAULT_MAX_CONNECTIONS);
|
||||||
client_configuration.http_keep_alive_timeout = config.getUInt(config_prefix + ".http_keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT);
|
client_configuration.http_keep_alive_timeout = config.getUInt(config_prefix + ".http_keep_alive_timeout", S3::DEFAULT_KEEP_ALIVE_TIMEOUT);
|
||||||
|
client_configuration.http_keep_alive_max_requests = config.getUInt(config_prefix + ".http_keep_alive_max_requests", S3::DEFAULT_KEEP_ALIVE_MAX_REQUESTS);
|
||||||
|
|
||||||
client_configuration.endpointOverride = uri.endpoint;
|
client_configuration.endpointOverride = uri.endpoint;
|
||||||
client_configuration.s3_use_adaptive_timeouts = config.getBool(
|
client_configuration.s3_use_adaptive_timeouts = config.getBool(
|
||||||
|
@ -148,6 +148,7 @@ void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeout
|
|||||||
if (!session.connected())
|
if (!session.connected())
|
||||||
{
|
{
|
||||||
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
|
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
|
||||||
|
session.setKeepAliveMaxRequests(int(timeouts.http_keep_alive_max_requests));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -35,6 +35,8 @@ struct ConnectionTimeouts
|
|||||||
|
|
||||||
Poco::Timespan tcp_keep_alive_timeout = Poco::Timespan(DEFAULT_TCP_KEEP_ALIVE_TIMEOUT, 0);
|
Poco::Timespan tcp_keep_alive_timeout = Poco::Timespan(DEFAULT_TCP_KEEP_ALIVE_TIMEOUT, 0);
|
||||||
Poco::Timespan http_keep_alive_timeout = Poco::Timespan(DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT, 0);
|
Poco::Timespan http_keep_alive_timeout = Poco::Timespan(DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT, 0);
|
||||||
|
size_t http_keep_alive_max_requests = DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST;
|
||||||
|
|
||||||
|
|
||||||
/// Timeouts for HedgedConnections
|
/// Timeouts for HedgedConnections
|
||||||
Poco::Timespan hedged_connection_timeout = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0);
|
Poco::Timespan hedged_connection_timeout = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0);
|
||||||
|
@ -22,6 +22,8 @@ inline static constexpr uint64_t DEFAULT_EXPIRATION_WINDOW_SECONDS = 120;
|
|||||||
inline static constexpr uint64_t DEFAULT_CONNECT_TIMEOUT_MS = 1000;
|
inline static constexpr uint64_t DEFAULT_CONNECT_TIMEOUT_MS = 1000;
|
||||||
inline static constexpr uint64_t DEFAULT_REQUEST_TIMEOUT_MS = 30000;
|
inline static constexpr uint64_t DEFAULT_REQUEST_TIMEOUT_MS = 30000;
|
||||||
inline static constexpr uint64_t DEFAULT_MAX_CONNECTIONS = 100;
|
inline static constexpr uint64_t DEFAULT_MAX_CONNECTIONS = 100;
|
||||||
|
inline static constexpr uint64_t DEFAULT_KEEP_ALIVE_TIMEOUT = 5;
|
||||||
|
inline static constexpr uint64_t DEFAULT_KEEP_ALIVE_MAX_REQUESTS = 100;
|
||||||
|
|
||||||
/// In GCP metadata service can be accessed via DNS regardless of IPv4 or IPv6.
|
/// In GCP metadata service can be accessed via DNS regardless of IPv4 or IPv6.
|
||||||
static inline constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.google.internal";
|
static inline constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.google.internal";
|
||||||
|
@ -52,6 +52,7 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
|
|||||||
/// See PoolBase::BehaviourOnLimit
|
/// See PoolBase::BehaviourOnLimit
|
||||||
bool s3_use_adaptive_timeouts = true;
|
bool s3_use_adaptive_timeouts = true;
|
||||||
size_t http_keep_alive_timeout = DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT;
|
size_t http_keep_alive_timeout = DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT;
|
||||||
|
size_t http_keep_alive_max_requests = DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST;
|
||||||
|
|
||||||
std::function<void(const DB::ProxyConfiguration &)> error_report;
|
std::function<void(const DB::ProxyConfiguration &)> error_report;
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user