several fixes for client's keep alive connections

This commit is contained in:
Sema Checherinda 2024-04-03 15:13:59 +02:00
parent ec22d64fc8
commit 8e6cbc8b31
12 changed files with 502 additions and 144 deletions

View File

@ -213,6 +213,13 @@ namespace Net
Poco::Timespan getKeepAliveTimeout() const;
/// Returns the connection timeout for HTTP connections.
bool isKeepAliveExpired(double reliability = 1.0) const;
/// Returns if the connection is expired with some margin as fraction of timeout as reliability
double getKeepAliveReliability() const;
/// Returns the current fraction of keep alive timeout when connection is considered safe to use
/// It helps to avoid situation when a client uses nearly expired connection and receives NoMessageException
virtual std::ostream & sendRequest(HTTPRequest & request);
/// Sends the header for the given HTTP request to
/// the server.
@ -361,6 +368,7 @@ namespace Net
Poco::SharedPtr<std::ostream> _pRequestStream;
Poco::SharedPtr<std::istream> _pResponseStream;
static const double _defaultKeepAliveReliabilityLevel;
static ProxyConfig _globalProxyConfig;
HTTPClientSession(const HTTPClientSession &);
@ -455,6 +463,11 @@ namespace Net
_lastRequest = time;
}
inline double HTTPClientSession::getKeepAliveReliability() const
{
return _defaultKeepAliveReliabilityLevel;
}
}
} // namespace Poco::Net

View File

@ -120,6 +120,9 @@ namespace Net
/// The value is set to "Keep-Alive" if keepAlive is
/// true, or to "Close" otherwise.
void setKeepAliveTimeout(int timeout);
int getKeepAliveTimeout() const;
bool getKeepAlive() const;
/// Returns true if
/// * the message has a Connection header field and its value is "Keep-Alive"

View File

@ -44,7 +44,7 @@ namespace Net
/// - timeout: 60 seconds
/// - keepAlive: true
/// - maxKeepAliveRequests: 0
/// - keepAliveTimeout: 10 seconds
/// - keepAliveTimeout: 15 seconds
void setServerName(const std::string & serverName);
/// Sets the name and port (name:port) that the server uses to identify itself.

View File

@ -56,6 +56,8 @@ namespace Net
SocketAddress serverAddress();
/// Returns the server's address.
void setKeepAliveTimeout(Poco::Timespan keepAliveTimeout);
private:
bool _firstRequest;
Poco::Timespan _keepAliveTimeout;

View File

@ -37,6 +37,7 @@ namespace Net {
HTTPClientSession::ProxyConfig HTTPClientSession::_globalProxyConfig;
const double HTTPClientSession::_defaultKeepAliveReliabilityLevel = 0.9;
HTTPClientSession::HTTPClientSession():
@ -220,6 +221,10 @@ void HTTPClientSession::setGlobalProxyConfig(const ProxyConfig& config)
void HTTPClientSession::setKeepAliveTimeout(const Poco::Timespan& timeout)
{
if (connected())
{
throw Poco::IllegalStateException("cannot change keep alive timeout on initiated connection");
}
_keepAliveTimeout = timeout;
}
@ -243,6 +248,8 @@ std::ostream& HTTPClientSession::sendRequest(HTTPRequest& request)
reconnect();
if (!keepAlive)
request.setKeepAlive(false);
if (keepAlive && !request.has(HTTPMessage::CONNECTION_KEEP_ALIVE) && _keepAliveTimeout.totalSeconds() > 0)
request.setKeepAliveTimeout(_keepAliveTimeout.totalSeconds());
if (!request.has(HTTPRequest::HOST) && !_host.empty())
request.setHost(_host, _port);
if (!_proxyConfig.host.empty() && !bypassProxy())
@ -324,6 +331,14 @@ std::istream& HTTPClientSession::receiveResponse(HTTPResponse& response)
_mustReconnect = getKeepAlive() && !response.getKeepAlive();
if (!_mustReconnect)
{
/// when server sends its keep alive timeout, client has to follow that value
auto timeout = response.getKeepAliveTimeout();
if (timeout > 0)
_keepAliveTimeout = Poco::Timespan(timeout, 0);
}
if (!_expectResponseBody || response.getStatus() < 200 || response.getStatus() == HTTPResponse::HTTP_NO_CONTENT || response.getStatus() == HTTPResponse::HTTP_NOT_MODIFIED)
_pResponseStream = new HTTPFixedLengthInputStream(*this, 0);
else if (response.getChunkedTransferEncoding())
@ -430,15 +445,17 @@ std::string HTTPClientSession::proxyRequestPrefix() const
return result;
}
bool HTTPClientSession::isKeepAliveExpired(double reliability) const
{
Poco::Timestamp now;
return Timespan(Timestamp::TimeDiff(reliability *_keepAliveTimeout.totalMicroseconds())) <= now - _lastRequest;
}
bool HTTPClientSession::mustReconnect() const
{
if (!_mustReconnect)
{
Poco::Timestamp now;
return _keepAliveTimeout <= now - _lastRequest;
}
else return true;
return isKeepAliveExpired(_defaultKeepAliveReliabilityLevel);
return true;
}
@ -511,14 +528,16 @@ void HTTPClientSession::assign(Poco::Net::HTTPClientSession & session)
if (buffered())
throw Poco::LogicException("assign to a session with not empty buffered data");
attachSocket(session.detachSocket());
setLastRequest(session.getLastRequest());
setResolvedHost(session.getResolvedHost());
setKeepAlive(session.getKeepAlive());
setProxyConfig(session.getProxyConfig());
setTimeout(session.getConnectionTimeout(), session.getSendTimeout(), session.getReceiveTimeout());
setKeepAlive(session.getKeepAlive());
if (!connected())
setKeepAliveTimeout(session.getKeepAliveTimeout());
setProxyConfig(session.getProxyConfig());
attachSocket(session.detachSocket());
session.reset();
}

View File

@ -17,6 +17,7 @@
#include "Poco/NumberFormatter.h"
#include "Poco/NumberParser.h"
#include "Poco/String.h"
#include <format>
using Poco::NumberFormatter;
@ -179,4 +180,44 @@ bool HTTPMessage::getKeepAlive() const
}
void HTTPMessage::setKeepAliveTimeout(int timeout)
{
add(HTTPMessage::CONNECTION_KEEP_ALIVE, std::format("timeout={}", timeout));
}
int parseTimeoutFromHeaderValue(const std::string_view header_value)
{
static const std::string_view timeout_param = "timeout=";
auto timeout_pos = header_value.find(timeout_param);
if (timeout_pos == std::string::npos)
timeout_pos = header_value.size();
if (timeout_pos != header_value.size())
timeout_pos += timeout_param.size();
auto timeout_end = header_value.find(',', timeout_pos);
if (timeout_end == std::string::npos)
timeout_end = header_value.size();
auto timeout_value_substr = header_value.substr(timeout_pos, timeout_end - timeout_pos);
if (timeout_value_substr.empty())
return -1;
int value = 0;
auto [ptr, ec] = std::from_chars(timeout_value_substr.begin(), timeout_value_substr.end(), value);
if (ec == std::errc())
return value;
return -1;
}
int HTTPMessage::getKeepAliveTimeout() const
{
const std::string& ka_header = get(HTTPMessage::CONNECTION_KEEP_ALIVE, HTTPMessage::EMPTY);
return parseTimeoutFromHeaderValue(ka_header);
}
} } // namespace Poco::Net

View File

@ -88,6 +88,17 @@ void HTTPServerConnection::run()
pHandler->handleRequest(request, response);
session.setKeepAlive(_pParams->getKeepAlive() && response.getKeepAlive() && session.canKeepAlive());
/// all that fuzz is all about to make session close with less timeout than 15s (set in HTTPServerParams c-tor)
if (_pParams->getKeepAlive() && response.getKeepAlive() && session.canKeepAlive())
{
int value = response.getKeepAliveTimeout();
if (value < 0)
value = request.getKeepAliveTimeout();
if (value > 0)
session.setKeepAliveTimeout(Poco::Timespan(value, 0));
}
}
else sendErrorResponse(session, HTTPResponse::HTTP_NOT_IMPLEMENTED);
}

View File

@ -33,6 +33,12 @@ HTTPServerSession::~HTTPServerSession()
{
}
void HTTPServerSession::setKeepAliveTimeout(Poco::Timespan keepAliveTimeout)
{
_keepAliveTimeout = keepAliveTimeout;
}
bool HTTPServerSession::hasMoreRequests()
{

View File

@ -16,6 +16,7 @@
#include <Poco/Net/HTTPRequest.h>
#include <Poco/Net/HTTPResponse.h>
#include <Poco/Net/HTTPStream.h>
#include <Poco/Net/NetException.h>
#include <Poco/Timespan.h>
#include <queue>
@ -83,17 +84,15 @@ namespace
}
size_t roundUp(size_t x, size_t rounding)
constexpr size_t roundUp(size_t x, size_t rounding)
{
chassert(rounding > 0);
return (x + (rounding - 1)) / rounding * rounding;
}
Poco::Timespan divide(const Poco::Timespan span, int divisor)
{
return Poco::Timespan(Poco::Timestamp::TimeDiff(span.totalMicroseconds() / divisor));
return (x + rounding) / rounding * rounding;
}
static_assert(roundUp(10000, 100) == 10100);
static_assert(roundUp(10001, 100) == 10100);
static_assert(roundUp(10099, 100) == 10100);
static_assert(roundUp(10100, 100) == 10200);
}
namespace DB
@ -202,8 +201,9 @@ public:
if (total_connections_in_group >= limits.warning_limit && total_connections_in_group >= mute_warning_until)
{
LOG_WARNING(log, "Too many active sessions in group {}, count {}, warning limit {}", type, total_connections_in_group, limits.warning_limit);
mute_warning_until = roundUp(total_connections_in_group, limits.warning_step);
LOG_WARNING(log, "Too many active sessions in group {}, count {}, warning limit {}, next warning at {}",
type, total_connections_in_group, limits.warning_limit, mute_warning_until);
}
}
@ -213,7 +213,7 @@ public:
--total_connections_in_group;
const size_t reduced_warning_limit = limits.warning_limit > 10 ? limits.warning_limit - 10 : 1;
const size_t reduced_warning_limit = limits.warning_limit > 10 ? limits.warning_limit - 20 : 1;
if (mute_warning_until > 0 && total_connections_in_group < reduced_warning_limit)
{
LOG_WARNING(log, "Sessions count is OK in the group {}, count {}", type, total_connections_in_group);
@ -221,6 +221,12 @@ public:
}
}
void atPoolDestroy(size_t connections)
{
std::lock_guard lock(mutex);
total_connections_in_group -= connections;
}
HTTPConnectionGroupType getType() const { return type; }
const IHTTPConnectionPoolForEndpoint::Metrics & getMetrics() const { return metrics; }
@ -273,9 +279,15 @@ private:
public:
using Ptr = std::shared_ptr<PooledConnection>;
using Session::mustReconnect;
void markAsExpired()
{
isExpired = true;
}
void reconnect() override
{
ProfileEvents::increment(metrics.reset);
Session::close();
if (auto lock = pool.lock())
@ -352,6 +364,11 @@ private:
std::istream & result = Session::receiveResponse(response);
result.exceptions(std::ios::badbit);
// that line is for temporary debug, will be removed
if (response.has(Poco::Net::HTTPMessage::CONNECTION_KEEP_ALIVE))
LOG_WARNING(log, "received keep alive header: {}",
response.get(Poco::Net::HTTPMessage::CONNECTION_KEEP_ALIVE, Poco::Net::HTTPMessage::EMPTY));
response_stream = &result;
response_stream_completed = false;
@ -392,10 +409,11 @@ private:
}
response_stream = nullptr;
group->atConnectionDestroy();
if (!isExpired)
if (auto lock = pool.lock())
lock->atConnectionDestroy(*this);
else
ProfileEvents::increment(metrics.reset);
CurrentMetrics::sub(metrics.active_count);
}
@ -404,10 +422,11 @@ private:
friend class EndpointConnectionPool;
template <class... Args>
explicit PooledConnection(EndpointConnectionPool::WeakPtr pool_, IHTTPConnectionPoolForEndpoint::Metrics metrics_, Args &&... args)
: Session(args...), pool(std::move(pool_)), metrics(std::move(metrics_))
explicit PooledConnection(EndpointConnectionPool::WeakPtr pool_, ConnectionGroup::Ptr group_, IHTTPConnectionPoolForEndpoint::Metrics metrics_, Args &&... args)
: Session(args...), pool(std::move(pool_)), group(group_), metrics(std::move(metrics_))
{
CurrentMetrics::add(metrics.active_count);
group->atConnectionCreate();
}
template <class... Args>
@ -433,10 +452,12 @@ private:
return request_stream_completed && response_stream_completed;
}
WeakPtr pool;
EndpointConnectionPool::WeakPtr pool;
ConnectionGroup::Ptr group;
IHTTPConnectionPoolForEndpoint::Metrics metrics;
bool isExpired = false;
Poco::Logger * log = &Poco::Logger::get("PooledConnection");
LoggerPtr log = getLogger("PooledConnection");
std::ostream * request_stream = nullptr;
std::istream * response_stream = nullptr;
@ -484,7 +505,6 @@ public:
IHTTPConnectionPoolForEndpoint::ConnectionPtr getConnection(const ConnectionTimeouts & timeouts) override
{
Poco::Timestamp now;
std::vector<ConnectionPtr> expired_connections;
SCOPE_EXIT({
@ -494,8 +514,9 @@ public:
{
std::lock_guard lock(mutex);
expired_connections.reserve(stored_connections.size());
wipeExpiredImpl(expired_connections, now);
wipeExpiredImpl(expired_connections);
if (!stored_connections.empty())
{
@ -526,7 +547,6 @@ public:
size_t wipeExpired() override
{
Poco::Timestamp now;
std::vector<ConnectionPtr> expired_connections;
SCOPE_EXIT({
@ -535,19 +555,21 @@ public:
});
std::lock_guard lock(mutex);
return wipeExpiredImpl(expired_connections, now);
return wipeExpiredImpl(expired_connections);
}
size_t wipeExpiredImpl(std::vector<ConnectionPtr> & expired_connections, Poco::Timestamp now) TSA_REQUIRES(mutex)
size_t wipeExpiredImpl(std::vector<ConnectionPtr> & expired_connections) TSA_REQUIRES(mutex)
{
auto isSoftLimitReached = group->isSoftLimitReached();
while (!stored_connections.empty())
{
auto connection = stored_connections.top();
if (!isExpired(now, connection))
if (!isExpired(connection, isSoftLimitReached))
return stored_connections.size();
stored_connections.pop();
connection->markAsExpired();
expired_connections.push_back(connection);
}
@ -569,16 +591,16 @@ private:
WeakPtr getWeakFromThis() { return EndpointConnectionPool::weak_from_this(); }
bool isExpired(Poco::Timestamp & now, ConnectionPtr connection)
bool isExpired(ConnectionPtr connection, bool isSoftLimitReached) TSA_REQUIRES(mutex)
{
if (group->isSoftLimitReached())
return now > (connection->getLastRequest() + divide(connection->getKeepAliveTimeout(), 10));
return now > connection->getLastRequest() + connection->getKeepAliveTimeout();
if (isSoftLimitReached)
return connection->isKeepAliveExpired(0.1);
return connection->isKeepAliveExpired(0.8);
}
ConnectionPtr allocateNewConnection()
{
ConnectionPtr connection = PooledConnection::create(this->getWeakFromThis(), getMetrics(), host, port);
ConnectionPtr connection = PooledConnection::create(this->getWeakFromThis(), group, getMetrics(), host, port);
connection->setKeepAlive(true);
if (!proxy_configuration.isEmpty())
@ -586,8 +608,6 @@ private:
connection->setProxyConfig(proxyConfigurationToPocoProxyConfig(proxy_configuration));
}
group->atConnectionCreate();
return connection;
}
@ -619,8 +639,6 @@ private:
void atConnectionDestroy(PooledConnection & connection)
{
group->atConnectionDestroy();
if (!connection.connected() || connection.mustReconnect() || !connection.isCompleted() || connection.buffered()
|| group->isStoreLimitReached())
{
@ -631,14 +649,14 @@ private:
auto connection_to_store = allocateNewConnection();
connection_to_store->assign(connection);
CurrentMetrics::add(getMetrics().stored_count, 1);
ProfileEvents::increment(getMetrics().preserved, 1);
{
MemoryTrackerSwitcher switcher{&total_memory_tracker};
std::lock_guard lock(mutex);
stored_connections.push(connection_to_store);
}
CurrentMetrics::add(getMetrics().stored_count, 1);
ProfileEvents::increment(getMetrics().preserved, 1);
}
@ -726,7 +744,7 @@ createConnectionPool(ConnectionGroup::Ptr group, std::string host, UInt16 port,
class HTTPConnectionPools::Impl
{
private:
const size_t DEFAULT_WIPE_TIMEOUT_SECONDS = 5 * 60;
const size_t DEFAULT_WIPE_TIMEOUT_SECONDS = 10 * 60;
const Poco::Timespan wipe_timeout = Poco::Timespan(DEFAULT_WIPE_TIMEOUT_SECONDS, 0);
ConnectionGroup::Ptr disk_group = std::make_shared<ConnectionGroup>(HTTPConnectionGroupType::DISK);

View File

@ -2,7 +2,6 @@
#include <Common/HTTPConnectionPool.h>
#include <Poco/URI.h>
#include <Poco/Net/ServerSocket.h>
#include <Poco/Net/MessageHeader.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
@ -17,6 +16,39 @@
namespace
{
template<class T>
class SafeHandler
{
public:
using Ptr = std::shared_ptr<SafeHandler<T>>;
SafeHandler() = default;
SafeHandler(SafeHandler<T>&) = delete;
SafeHandler& operator=(SafeHandler<T>&) = delete;
T get()
{
std::lock_guard lock(mutex);
return obj;
}
void set(T && options_)
{
std::lock_guard lock(mutex);
obj = std::move(options_);
}
protected:
std::mutex mutex;
T obj = {};
};
struct RequestOptions
{
size_t slowdown_receive = 0;
int overwrite_keep_alive_timeout = 0;
};
size_t stream_copy_n(std::istream & in, std::ostream & out, std::size_t count = std::numeric_limits<size_t>::max())
{
const size_t buffer_size = 4096;
@ -47,13 +79,19 @@ size_t stream_copy_n(std::istream & in, std::ostream & out, std::size_t count =
class MockRequestHandler : public Poco::Net::HTTPRequestHandler
{
public:
explicit MockRequestHandler(std::shared_ptr<std::atomic<size_t>> slowdown_)
: slowdown(std::move(slowdown_))
explicit MockRequestHandler(SafeHandler<RequestOptions>::Ptr options_)
: options(options_)
{
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override
{
int value = request.getKeepAliveTimeout();
ASSERT_GT(value, 0);
if (options->get().overwrite_keep_alive_timeout > 0)
response.setKeepAliveTimeout(options->get().overwrite_keep_alive_timeout);
response.setStatus(Poco::Net::HTTPResponse::HTTP_OK);
auto size = request.getContentLength();
if (size > 0)
@ -61,28 +99,29 @@ public:
else
response.setChunkedTransferEncoding(true); // or chunk encoding
sleepForSeconds(*slowdown);
if (options->get().slowdown_receive > 0)
sleepForSeconds(options->get().slowdown_receive);
stream_copy_n(request.stream(), response.send(), size);
}
std::shared_ptr<std::atomic<size_t>> slowdown;
SafeHandler<RequestOptions>::Ptr options;
};
class HTTPRequestHandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
{
public:
explicit HTTPRequestHandlerFactory(std::shared_ptr<std::atomic<size_t>> slowdown_)
: slowdown(std::move(slowdown_))
explicit HTTPRequestHandlerFactory(SafeHandler<RequestOptions>::Ptr options_)
: options(options_)
{
}
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest &) override
{
return new MockRequestHandler(slowdown);
return new MockRequestHandler(options);
}
std::shared_ptr<std::atomic<size_t>> slowdown;
SafeHandler<RequestOptions>::Ptr options;
};
}
@ -94,6 +133,8 @@ class ConnectionPoolTest : public testing::Test {
protected:
ConnectionPoolTest()
{
options = std::make_shared<SafeHandler<RequestOptions>>();
startServer();
}
@ -102,7 +143,7 @@ protected:
DB::HTTPConnectionPools::Limits def_limits{};
DB::HTTPConnectionPools::instance().setLimits(def_limits, def_limits, def_limits);
setSlowDown(0);
options->set(RequestOptions());
DB::HTTPConnectionPools::instance().dropCache();
DB::CurrentThread::getProfileEvents().reset();
@ -129,7 +170,7 @@ protected:
void startServer()
{
server_data.reset();
server_data.handler_factory = new HTTPRequestHandlerFactory(slowdown_receive);
server_data.handler_factory = new HTTPRequestHandlerFactory(options);
server_data.server = std::make_unique<Poco::Net::HTTPServer>(
server_data.handler_factory, server_data.port);
@ -143,11 +184,20 @@ protected:
void setSlowDown(size_t seconds)
{
*slowdown_receive = seconds;
auto opt = options->get();
opt.slowdown_receive = seconds;
options->set(std::move(opt));
}
void setOverWriteTimeout(size_t seconds)
{
auto opt = options->get();
opt.overwrite_keep_alive_timeout = int(seconds);
options->set(std::move(opt));
}
DB::ConnectionTimeouts timeouts;
std::shared_ptr<std::atomic<size_t>> slowdown_receive = std::make_shared<std::atomic<size_t>>(0);
SafeHandler<RequestOptions>::Ptr options;
struct ServerData
{
@ -182,7 +232,7 @@ protected:
void wait_until(std::function<bool()> pred)
{
while (!pred())
sleepForMilliseconds(250);
sleepForMilliseconds(10);
}
void echoRequest(String data, HTTPSession & session)
@ -245,45 +295,52 @@ TEST_F(ConnectionPoolTest, CanRequest)
ASSERT_EQ(0, getServer().currentConnections());
ASSERT_EQ(1, getServer().totalConnections());
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
auto metrics = pool->getMetrics();
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, CanPreserve)
{
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
}
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
wait_until([&] () { return getServer().currentConnections() == 1; });
ASSERT_EQ(1, getServer().currentConnections());
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
}
TEST_F(ConnectionPoolTest, CanReuse)
{
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
// DB::setReuseTag(*connection);
}
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().stored_count));
{
auto connection = pool->getConnection(timeouts);
ASSERT_EQ(1, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
wait_until([&] () { return getServer().currentConnections() == 1; });
ASSERT_EQ(1, getServer().currentConnections());
@ -293,6 +350,11 @@ TEST_F(ConnectionPoolTest, CanReuse)
ASSERT_EQ(1, getServer().totalConnections());
ASSERT_EQ(1, getServer().currentConnections());
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
connection->reset();
}
@ -303,15 +365,16 @@ TEST_F(ConnectionPoolTest, CanReuse)
ASSERT_EQ(0, getServer().currentConnections());
ASSERT_EQ(1, getServer().totalConnections());
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
}
TEST_F(ConnectionPoolTest, CanReuse10)
{
auto pool = getPool();
auto metrics = pool->getMetrics();
for (int i = 0; i < 10; ++i)
{
@ -328,16 +391,23 @@ TEST_F(ConnectionPoolTest, CanReuse10)
ASSERT_EQ(0, getServer().currentConnections());
ASSERT_EQ(1, getServer().totalConnections());
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, CanReuse5)
{
timeouts.withHTTPKeepAliveTimeout(1);
auto ka = Poco::Timespan(1, 0); // 1 seconds
timeouts.withHTTPKeepAliveTimeout(ka);
auto pool = getPool();
auto metrics = pool->getMetrics();
std::vector<DB::HTTPSessionPtr> connections;
connections.reserve(5);
@ -347,11 +417,14 @@ TEST_F(ConnectionPoolTest, CanReuse5)
}
connections.clear();
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(5, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(5, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(5, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(5, CurrentMetrics::get(metrics.stored_count));
wait_until([&] () { return getServer().currentConnections() == 5; });
ASSERT_EQ(5, getServer().currentConnections());
@ -363,35 +436,56 @@ TEST_F(ConnectionPoolTest, CanReuse5)
echoRequest("Hello", *connection);
}
ASSERT_EQ(5, getServer().totalConnections());
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(5, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(5, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(5, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(5, CurrentMetrics::get(metrics.stored_count));
/// wait until all connections are timeouted
wait_until([&] () { return getServer().currentConnections() == 0; });
{
// just to trigger pool->wipeExpired();
auto connection = pool->getConnection(timeouts);
connection->reset();
}
ASSERT_EQ(6, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(5, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, CanReconnectAndCreate)
{
auto pool = getPool();
auto metrics = pool->getMetrics();
std::vector<HTTPSessionPtr> in_use;
const size_t count = 2;
const size_t count = 3;
for (int i = 0; i < count; ++i)
{
auto connection = pool->getConnection(timeouts);
// DB::setReuseTag(*connection);
in_use.push_back(connection);
}
ASSERT_EQ(count, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(count, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(count, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(count, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
auto connection = std::move(in_use.back());
in_use.pop_back();
@ -402,28 +496,39 @@ TEST_F(ConnectionPoolTest, CanReconnectAndCreate)
echoRequest("Hello", *connection);
connection->reset();
ASSERT_EQ(count+1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
wait_until([&] () { return getServer().currentConnections() == 1; });
ASSERT_EQ(1, getServer().currentConnections());
ASSERT_EQ(count+1, getServer().totalConnections());
ASSERT_EQ(count+1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(count, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, CanReconnectAndReuse)
{
auto ka = Poco::Timespan(1, 0); // 1 seconds
timeouts.withHTTPKeepAliveTimeout(ka);
auto pool = getPool();
auto metrics = pool->getMetrics();
std::vector<HTTPSessionPtr> in_use;
const size_t count = 2;
const size_t count = 3;
for (int i = 0; i < count; ++i)
{
auto connection = pool->getConnection(timeouts);
/// make some request in order to show to the server the keep alive headers
echoRequest("Hello", *connection);
in_use.push_back(std::move(connection));
}
in_use.clear();
for (int i = 0; i < count; ++i)
{
auto connection = pool->getConnection(timeouts);
// DB::setReuseTag(*connection);
in_use.push_back(std::move(connection));
}
@ -441,11 +546,16 @@ TEST_F(ConnectionPoolTest, CanReconnectAndReuse)
wait_until([&] () { return getServer().currentConnections() == 0; });
ASSERT_EQ(0, getServer().currentConnections());
ASSERT_EQ(2, getServer().totalConnections());
ASSERT_EQ(count, getServer().totalConnections());
ASSERT_EQ(count, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(count, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(count + count - 1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(count + 1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(count-1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(count-2, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, ReceiveTimeout)
@ -454,6 +564,7 @@ TEST_F(ConnectionPoolTest, ReceiveTimeout)
timeouts.withReceiveTimeout(1);
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
@ -462,10 +573,14 @@ TEST_F(ConnectionPoolTest, ReceiveTimeout)
);
}
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
{
timeouts.withReceiveTimeout(3);
@ -475,10 +590,14 @@ TEST_F(ConnectionPoolTest, ReceiveTimeout)
);
}
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
{
/// timeouts have effect for reused session
@ -489,10 +608,14 @@ TEST_F(ConnectionPoolTest, ReceiveTimeout)
);
}
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reused]);
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, ReadWriteBufferFromHTTP)
@ -500,6 +623,7 @@ TEST_F(ConnectionPoolTest, ReadWriteBufferFromHTTP)
std::string_view message = "Hello ReadWriteBufferFromHTTP";
auto uri = Poco::URI(getServerUrl());
auto metrics = DB::HTTPConnectionPools::instance().getPool(DB::HTTPConnectionGroupType::HTTP, uri, DB::ProxyConfiguration{})->getMetrics();
Poco::Net::HTTPBasicCredentials empty_creds;
auto buf_from_http = DB::BuilderRWBufferFromHTTP(uri)
.withConnectionGroup(DB::HTTPConnectionGroupType::HTTP)
@ -527,6 +651,7 @@ TEST_F(ConnectionPoolTest, ReadWriteBufferFromHTTP)
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
@ -538,23 +663,26 @@ TEST_F(ConnectionPoolTest, HardLimit)
DB::HTTPConnectionPools::instance().setLimits(zero_limits, zero_limits, zero_limits);
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
}
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, NoReceiveCall)
{
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
@ -570,11 +698,124 @@ TEST_F(ConnectionPoolTest, NoReceiveCall)
connection->flushRequest();
}
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().active_count));
ASSERT_EQ(0, CurrentMetrics::get(pool->getMetrics().stored_count));
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().created]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[pool->getMetrics().preserved]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[pool->getMetrics().reset]);
ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, ReconnectedWhenConnectionIsHoldTooLong)
{
auto ka = Poco::Timespan(1, 0); // 1 seconds
timeouts.withHTTPKeepAliveTimeout(ka);
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
auto fake_ka = Poco::Timespan(30 * 1000 * 1000); // 30 seconds
timeouts.withHTTPKeepAliveTimeout(fake_ka);
DB::setTimeouts(*connection, timeouts); // new keep alive timeout has no effect
wait_until([&] () { return getServer().currentConnections() == 0; });
ASSERT_EQ(1, connection->connected());
ASSERT_EQ(1, connection->getKeepAlive());
ASSERT_EQ(1000, connection->getKeepAliveTimeout().totalMilliseconds());
ASSERT_EQ(1, connection->isKeepAliveExpired(connection->getKeepAliveReliability()));
echoRequest("Hello", *connection);
}
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, ReconnectedWhenConnectionIsNearlyExpired)
{
auto ka = Poco::Timespan(1, 0); // 1 seconds
timeouts.withHTTPKeepAliveTimeout(ka);
auto pool = getPool();
auto metrics = pool->getMetrics();
{
{
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
}
sleepForMilliseconds(900);
{
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
}
}
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
}
TEST_F(ConnectionPoolTest, ServerOverwriteKeepAlive)
{
auto ka = Poco::Timespan(30, 0); // 30 seconds
timeouts.withHTTPKeepAliveTimeout(ka);
auto pool = getPool();
auto metrics = pool->getMetrics();
{
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds());
ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds());
}
{
setOverWriteTimeout(1);
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds());
ASSERT_EQ(1, connection->getKeepAliveTimeout().totalSeconds());
}
{
// server do not overwrite it in the following requests but client has to remember last agreed value
setOverWriteTimeout(0);
auto connection = pool->getConnection(timeouts);
echoRequest("Hello", *connection);
ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds());
ASSERT_EQ(1, connection->getKeepAliveTimeout().totalSeconds());
}
ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]);
ASSERT_EQ(3, DB::CurrentThread::getProfileEvents()[metrics.preserved]);
ASSERT_EQ(2, DB::CurrentThread::getProfileEvents()[metrics.reused]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]);
ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]);
ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count));
ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count));
}

View File

@ -128,9 +128,9 @@ namespace DB
M(Bool, format_alter_operations_with_parentheses, false, "If enabled, each operation in alter queries will be surrounded with parentheses in formatted queries to make them less ambiguous.", 0) \
M(String, default_replica_path, "/clickhouse/tables/{uuid}/{shard}", "The path to the table in ZooKeeper", 0) \
M(String, default_replica_name, "{replica}", "The replica name in ZooKeeper", 0) \
M(UInt64, disk_connections_soft_limit, 1000, "Connections above this limit have significantly shorter time to live. The limit applies to the disks connections.", 0) \
M(UInt64, disk_connections_soft_limit, 5000, "Connections above this limit have significantly shorter time to live. The limit applies to the disks connections.", 0) \
M(UInt64, disk_connections_warn_limit, 10000, "Warning massages are written to the logs if number of in-use connections are higher than this limit. The limit applies to the disks connections.", 0) \
M(UInt64, disk_connections_store_limit, 12000, "Connections above this limit reset after use. Set to 0 to turn connection cache off. The limit applies to the disks connections.", 0) \
M(UInt64, disk_connections_store_limit, 30000, "Connections above this limit reset after use. Set to 0 to turn connection cache off. The limit applies to the disks connections.", 0) \
M(UInt64, storage_connections_soft_limit, 100, "Connections above this limit have significantly shorter time to live. The limit applies to the storages connections.", 0) \
M(UInt64, storage_connections_warn_limit, 1000, "Warning massages are written to the logs if number of in-use connections are higher than this limit. The limit applies to the storages connections.", 0) \
M(UInt64, storage_connections_store_limit, 5000, "Connections above this limit reset after use. Set to 0 to turn connection cache off. The limit applies to the storages connections.", 0) \

View File

@ -144,8 +144,12 @@ ConnectionTimeouts ConnectionTimeouts::getAdaptiveTimeouts(const String & method
void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeouts & timeouts)
{
session.setTimeout(timeouts.connection_timeout, timeouts.send_timeout, timeouts.receive_timeout);
/// we can not change keep alive timeout for already initiated connections
if (!session.connected())
{
session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout);
}
}
ConnectionTimeouts getTimeouts(const Poco::Net::HTTPClientSession & session)
{