From ae3a1999398b4f16880e2d892cb11bb414944b81 Mon Sep 17 00:00:00 2001 From: Sema Checherinda Date: Thu, 4 Apr 2024 22:49:52 +0200 Subject: [PATCH] support max requests for keep alive --- .../Net/include/Poco/Net/HTTPClientSession.h | 20 ++++ base/poco/Net/include/Poco/Net/HTTPMessage.h | 3 +- base/poco/Net/src/HTTPClientSession.cpp | 33 +++++- base/poco/Net/src/HTTPMessage.cpp | 37 ++++--- src/Common/HTTPConnectionPool.cpp | 69 +++++++----- src/Common/tests/gtest_connection_pool.cpp | 103 ++++++++++++++++-- src/Core/Defines.h | 1 + src/Disks/ObjectStorages/S3/diskSettings.cpp | 3 +- src/IO/ConnectionTimeouts.cpp | 1 + src/IO/ConnectionTimeouts.h | 2 + src/IO/S3/Credentials.h | 2 + src/IO/S3/PocoHTTPClient.h | 1 + 12 files changed, 219 insertions(+), 56 deletions(-) diff --git a/base/poco/Net/include/Poco/Net/HTTPClientSession.h b/base/poco/Net/include/Poco/Net/HTTPClientSession.h index cbf4619834b..edbb135d8c6 100644 --- a/base/poco/Net/include/Poco/Net/HTTPClientSession.h +++ b/base/poco/Net/include/Poco/Net/HTTPClientSession.h @@ -213,6 +213,12 @@ namespace Net Poco::Timespan getKeepAliveTimeout() const; /// Returns the connection timeout for HTTP connections. + void setKeepAliveMaxRequests(int max_requests); + + int getKeepAliveMaxRequests() const; + + int getKeepAliveRequest() const; + bool isKeepAliveExpired(double reliability = 1.0) const; /// Returns if the connection is expired with some margin as fraction of timeout as reliability @@ -352,6 +358,8 @@ namespace Net void assign(HTTPClientSession & session); + void setKeepAliveRequest(int request); + HTTPSessionFactory _proxySessionFactory; /// Factory to create HTTPClientSession to proxy. private: @@ -360,6 +368,8 @@ namespace Net Poco::UInt16 _port; ProxyConfig _proxyConfig; Poco::Timespan _keepAliveTimeout; + int _keepAliveCurrentRequest = 0; + int _keepAliveMaxRequests = 1000; Poco::Timestamp _lastRequest; bool _reconnect; bool _mustReconnect; @@ -463,6 +473,16 @@ namespace Net return _defaultKeepAliveReliabilityLevel; } + inline int HTTPClientSession::getKeepAliveMaxRequests() const + { + return _keepAliveMaxRequests; + } + + inline int HTTPClientSession::getKeepAliveRequest() const + { + return _keepAliveCurrentRequest; + } + } } // namespace Poco::Net diff --git a/base/poco/Net/include/Poco/Net/HTTPMessage.h b/base/poco/Net/include/Poco/Net/HTTPMessage.h index 994807ffbff..8bc95ccc1af 100644 --- a/base/poco/Net/include/Poco/Net/HTTPMessage.h +++ b/base/poco/Net/include/Poco/Net/HTTPMessage.h @@ -120,8 +120,9 @@ namespace Net /// The value is set to "Keep-Alive" if keepAlive is /// true, or to "Close" otherwise. - void setKeepAliveTimeout(int timeout); + void setKeepAliveTimeout(int timeout, int max_requests); int getKeepAliveTimeout() const; + int getKeepAliveMaxRequests() const; bool getKeepAlive() const; /// Returns true if diff --git a/base/poco/Net/src/HTTPClientSession.cpp b/base/poco/Net/src/HTTPClientSession.cpp index bc70559c5eb..e489ab56b98 100644 --- a/base/poco/Net/src/HTTPClientSession.cpp +++ b/base/poco/Net/src/HTTPClientSession.cpp @@ -230,7 +230,25 @@ void HTTPClientSession::setKeepAliveTimeout(const Poco::Timespan& timeout) } -void HTTPClientSession::setLastRequest(Poco::Timestamp time) +void HTTPClientSession::setKeepAliveMaxRequests(int max_requests) +{ + if (connected()) + { + throw Poco::IllegalStateException("cannot change keep alive max requests on initiated connection, " + "That value is managed privately after connection is established."); + } + _keepAliveMaxRequests = max_requests; +} + + +void HTTPClientSession::setKeepAliveRequest(int request) +{ + _keepAliveCurrentRequest = request; +} + + + + void HTTPClientSession::setLastRequest(Poco::Timestamp time) { if (connected()) { @@ -248,6 +266,8 @@ std::ostream& HTTPClientSession::sendRequest(HTTPRequest& request) clearException(); _responseReceived = false; + _keepAliveCurrentRequest += 1; + bool keepAlive = getKeepAlive(); if (((connected() && !keepAlive) || mustReconnect()) && !_host.empty()) { @@ -261,7 +281,7 @@ std::ostream& HTTPClientSession::sendRequest(HTTPRequest& request) if (!request.has(HTTPMessage::CONNECTION)) request.setKeepAlive(keepAlive); if (keepAlive && !request.has(HTTPMessage::CONNECTION_KEEP_ALIVE) && _keepAliveTimeout.totalSeconds() > 0) - request.setKeepAliveTimeout(_keepAliveTimeout.totalSeconds()); + request.setKeepAliveTimeout(_keepAliveTimeout.totalSeconds(), _keepAliveMaxRequests); if (!request.has(HTTPRequest::HOST) && !_host.empty()) request.setHost(_host, _port); if (!_proxyConfig.host.empty() && !bypassProxy()) @@ -349,6 +369,9 @@ std::istream& HTTPClientSession::receiveResponse(HTTPResponse& response) auto timeout = response.getKeepAliveTimeout(); if (timeout > 0) _keepAliveTimeout = std::min(_keepAliveTimeout, Poco::Timespan(timeout, 0)); + auto max_requests = response.getKeepAliveMaxRequests(); + if (max_requests > 0) + _keepAliveMaxRequests = std::min(_keepAliveMaxRequests, max_requests); } if (!_expectResponseBody || response.getStatus() < 200 || response.getStatus() == HTTPResponse::HTTP_NO_CONTENT || response.getStatus() == HTTPResponse::HTTP_NOT_MODIFIED) @@ -460,7 +483,8 @@ std::string HTTPClientSession::proxyRequestPrefix() const bool HTTPClientSession::isKeepAliveExpired(double reliability) const { Poco::Timestamp now; - return Timespan(Timestamp::TimeDiff(reliability *_keepAliveTimeout.totalMicroseconds())) <= now - _lastRequest; + return Timespan(Timestamp::TimeDiff(reliability *_keepAliveTimeout.totalMicroseconds())) <= now - _lastRequest + || _keepAliveCurrentRequest > _keepAliveMaxRequests; } bool HTTPClientSession::mustReconnect() const @@ -551,6 +575,9 @@ void HTTPClientSession::assign(Poco::Net::HTTPClientSession & session) setLastRequest(session.getLastRequest()); setKeepAliveTimeout(session.getKeepAliveTimeout()); + _keepAliveMaxRequests = session._keepAliveMaxRequests; + _keepAliveCurrentRequest = session._keepAliveCurrentRequest; + attachSocket(session.detachSocket()); session.reset(); diff --git a/base/poco/Net/src/HTTPMessage.cpp b/base/poco/Net/src/HTTPMessage.cpp index af743dfa2eb..c0083ec410c 100644 --- a/base/poco/Net/src/HTTPMessage.cpp +++ b/base/poco/Net/src/HTTPMessage.cpp @@ -180,27 +180,25 @@ bool HTTPMessage::getKeepAlive() const } -void HTTPMessage::setKeepAliveTimeout(int timeout) +void HTTPMessage::setKeepAliveTimeout(int timeout, int max_requests) { - add(HTTPMessage::CONNECTION_KEEP_ALIVE, std::format("timeout={}, max=1000", timeout)); + add(HTTPMessage::CONNECTION_KEEP_ALIVE, std::format("timeout={}, max={}", timeout, max_requests)); } -int parseTimeoutFromHeaderValue(const std::string_view header_value) +int parseFromHeaderValues(const std::string_view header_value, const std::string_view param_name) { - static const std::string_view timeout_param = "timeout="; + auto param_value_pos = header_value.find(param_name); + if (param_value_pos == std::string::npos) + param_value_pos = header_value.size(); + if (param_value_pos != header_value.size()) + param_value_pos += param_name.size(); - auto timeout_pos = header_value.find(timeout_param); - if (timeout_pos == std::string::npos) - timeout_pos = header_value.size(); - if (timeout_pos != header_value.size()) - timeout_pos += timeout_param.size(); + auto param_value_end = header_value.find(',', param_value_pos); + if (param_value_end == std::string::npos) + param_value_end = header_value.size(); - auto timeout_end = header_value.find(',', timeout_pos); - if (timeout_end == std::string::npos) - timeout_end = header_value.size(); - - auto timeout_value_substr = header_value.substr(timeout_pos, timeout_end - timeout_pos); + auto timeout_value_substr = header_value.substr(param_value_pos, param_value_end - param_value_pos); if (timeout_value_substr.empty()) return -1; @@ -217,7 +215,16 @@ int parseTimeoutFromHeaderValue(const std::string_view header_value) int HTTPMessage::getKeepAliveTimeout() const { const std::string& ka_header = get(HTTPMessage::CONNECTION_KEEP_ALIVE, HTTPMessage::EMPTY); - return parseTimeoutFromHeaderValue(ka_header); + static const std::string_view timeout_param = "timeout="; + return parseFromHeaderValues(ka_header, timeout_param); +} + + +int HTTPMessage::getKeepAliveMaxRequests() const +{ + const std::string& ka_header = get(HTTPMessage::CONNECTION_KEEP_ALIVE, HTTPMessage::EMPTY); + static const std::string_view timeout_param = "max="; + return parseFromHeaderValues(ka_header, timeout_param); } } } // namespace Poco::Net diff --git a/src/Common/HTTPConnectionPool.cpp b/src/Common/HTTPConnectionPool.cpp index eb6ce00e611..926222934e4 100644 --- a/src/Common/HTTPConnectionPool.cpp +++ b/src/Common/HTTPConnectionPool.cpp @@ -301,6 +301,8 @@ private: auto timeouts = getTimeouts(*this); auto new_connection = lock->getConnection(timeouts); Session::assign(*new_connection); + if (Session::getKeepAliveRequest() == 0) + Session::setKeepAliveRequest(1); } else { @@ -322,7 +324,8 @@ private: Session::getPort()); } - Poco::Timespan idleTime() { + Poco::Timespan idleTime() + { Poco::Timestamp now; return now - Session::getLastRequest(); } @@ -374,11 +377,11 @@ private: result.exceptions(std::ios::badbit); // that line is for temporary debug, will be removed - LOG_INFO(log, "Send request to {} with: version {}, method {}, usage count {}, keep-alive timeout={}, last usage ago: {}ms, headers: {}", + LOG_INFO(log, "Send request to {} with: version {}, method {}, request no {}, keep-alive timeout={}, last usage ago: {}ms, headers: {}", request.getVersion(), request.getMethod(), getTarget(), - usage_cnt, + Session::getKeepAliveRequest(), Session::getKeepAliveTimeout().totalSeconds(), idle.totalMilliseconds(), printAllHeaders(request)); @@ -400,11 +403,11 @@ private: result.exceptions(std::ios::badbit); // that line is for temporary debug, will be removed - LOG_INFO(log, "Received response from {} with: version {}, code {}, usage count {}, keep alive header: {}, original ka {}, last usage ago: {}ms, headers: {}", + LOG_INFO(log, "Received response from {} with: version {}, code {}, request no {}, keep alive header: {}, original ka {}, last usage ago: {}ms, headers: {}", getTarget(), response.getVersion(), int(response.getStatus()), - usage_cnt, + Session::getKeepAliveRequest(), response.get(Poco::Net::HTTPMessage::CONNECTION_KEEP_ALIVE, Poco::Net::HTTPMessage::EMPTY), originKA, idleTime().totalMilliseconds(), @@ -460,9 +463,9 @@ private: else { Poco::Timestamp now; - LOG_INFO(log, "Expired connection to {} with: usage count {}, keep alive timeout: {}, last usage ago: {}s", + LOG_INFO(log, "Expired connection to {} with: request no {}, keep alive timeout: {}, last usage ago: {}s", getTarget(), - usage_cnt, + Session::getKeepAliveRequest(), Session::getKeepAliveTimeout().totalSeconds(), idleTime().totalSeconds()); } @@ -474,8 +477,15 @@ private: friend class EndpointConnectionPool; template - explicit PooledConnection(EndpointConnectionPool::WeakPtr pool_, ConnectionGroup::Ptr group_, IHTTPConnectionPoolForEndpoint::Metrics metrics_, Args &&... args) - : Session(args...), pool(std::move(pool_)), group(group_), metrics(std::move(metrics_)) + explicit PooledConnection( + EndpointConnectionPool::WeakPtr pool_, + ConnectionGroup::Ptr group_, + IHTTPConnectionPoolForEndpoint::Metrics metrics_, + Args &&... args) + : Session(args...) + , pool(std::move(pool_)) + , group(group_) + , metrics(std::move(metrics_)) { CurrentMetrics::add(metrics.active_count); group->atConnectionCreate(); @@ -508,7 +518,7 @@ private: ConnectionGroup::Ptr group; IHTTPConnectionPoolForEndpoint::Metrics metrics; bool isExpired = false; - size_t usage_cnt = 1; + size_t exception_level = std::uncaught_exceptions(); LoggerPtr log = getLogger("PooledConnection"); @@ -578,7 +588,6 @@ public: stored_connections.pop(); setTimeouts(*it, timeouts); - it->usage_cnt += 1; ProfileEvents::increment(getMetrics().reused, 1); CurrentMetrics::sub(getMetrics().stored_count, 1); @@ -655,47 +664,50 @@ private: return connection->isKeepAliveExpired(0.8); } - ConnectionPtr allocateNewConnection() + + ConnectionPtr prepareNewConnection(const ConnectionTimeouts & timeouts) { - ConnectionPtr connection = PooledConnection::create(this->getWeakFromThis(), group, getMetrics(), host, port); + auto connection = PooledConnection::create(this->getWeakFromThis(), group, getMetrics(), host, port); + connection->setKeepAlive(true); + setTimeouts(*connection, timeouts); if (!proxy_configuration.isEmpty()) { connection->setProxyConfig(proxyConfigurationToPocoProxyConfig(proxy_configuration)); } - return connection; - } - - ConnectionPtr prepareNewConnection(const ConnectionTimeouts & timeouts) - { auto address = HostResolversPool::instance().getResolver(host)->resolve(); - - auto session = allocateNewConnection(); - - setTimeouts(*session, timeouts); - session->setResolvedHost(*address); + connection->setResolvedHost(*address); try { auto timer = CurrentThread::getProfileEvents().timer(getMetrics().elapsed_microseconds); - session->doConnect(); + connection->doConnect(); } catch (...) { address.setFail(); ProfileEvents::increment(getMetrics().errors); - session->reset(); + connection->reset(); throw; } ProfileEvents::increment(getMetrics().created); - return session; + return connection; } void atConnectionDestroy(PooledConnection & connection) { + if (connection.getKeepAliveRequest() >= connection.getKeepAliveMaxRequests()) + { + LOG_INFO(getLogger("PooledConnection"), "Expired by connection number {}", + connection.getKeepAliveRequest()); + + ProfileEvents::increment(getMetrics().expired, 1); + return; + } + if (!connection.connected() || connection.mustReconnect() || !connection.isCompleted() || connection.buffered() || group->isStoreLimitReached()) { @@ -703,7 +715,7 @@ private: LOG_INFO(getLogger("PooledConnection"), "Reset connection to {} with: usage count {}, keep alive timeout: {}, connected {}, must recon {}, last usage ago: {}, is completed {}, store limit reached {} as {}/{}, there is exception {}", getTarget(), - connection.usage_cnt, + connection.getKeepAliveRequest(), connection.getKeepAliveTimeout().totalSeconds(), connection.connected(), connection.mustReconnect(), @@ -716,9 +728,8 @@ private: return; } - auto connection_to_store = allocateNewConnection(); + auto connection_to_store = PooledConnection::create(this->getWeakFromThis(), group, getMetrics(), host, port); connection_to_store->assign(connection); - connection_to_store->usage_cnt = connection.usage_cnt; { MemoryTrackerSwitcher switcher{&total_memory_tracker}; diff --git a/src/Common/tests/gtest_connection_pool.cpp b/src/Common/tests/gtest_connection_pool.cpp index 36bf8bc7dae..cc091d12bb0 100644 --- a/src/Common/tests/gtest_connection_pool.cpp +++ b/src/Common/tests/gtest_connection_pool.cpp @@ -47,6 +47,7 @@ struct RequestOptions { size_t slowdown_receive = 0; int overwrite_keep_alive_timeout = 0; + int overwrite_keep_alive_max_requests = 10; }; size_t stream_copy_n(std::istream & in, std::ostream & out, std::size_t count = std::numeric_limits::max()) @@ -89,8 +90,10 @@ public: int value = request.getKeepAliveTimeout(); ASSERT_GT(value, 0); - if (options->get().overwrite_keep_alive_timeout > 0) - response.setKeepAliveTimeout(options->get().overwrite_keep_alive_timeout); + auto params = options->get(); + + if (params.overwrite_keep_alive_timeout > 0) + response.setKeepAliveTimeout(params.overwrite_keep_alive_timeout, params.overwrite_keep_alive_max_requests); response.setStatus(Poco::Net::HTTPResponse::HTTP_OK); auto size = request.getContentLength(); @@ -99,8 +102,8 @@ public: else response.setChunkedTransferEncoding(true); // or chunk encoding - if (options->get().slowdown_receive > 0) - sleepForSeconds(options->get().slowdown_receive); + if (params.slowdown_receive > 0) + sleepForSeconds(params.slowdown_receive); stream_copy_n(request.stream(), response.send(), size); } @@ -189,10 +192,11 @@ protected: options->set(std::move(opt)); } - void setOverWriteTimeout(size_t seconds) + void setOverWriteKeepAlive(size_t seconds, int max_requests) { auto opt = options->get(); opt.overwrite_keep_alive_timeout = int(seconds); + opt.overwrite_keep_alive_max_requests= max_requests; options->set(std::move(opt)); } @@ -794,7 +798,7 @@ TEST_F(ConnectionPoolTest, ServerOverwriteKeepAlive) } { - setOverWriteTimeout(1); + setOverWriteKeepAlive(1, 10); auto connection = pool->getConnection(timeouts); echoRequest("Hello", *connection); ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds()); @@ -803,7 +807,7 @@ TEST_F(ConnectionPoolTest, ServerOverwriteKeepAlive) { // server do not overwrite it in the following requests but client has to remember last agreed value - setOverWriteTimeout(0); + setOverWriteKeepAlive(0, 0); auto connection = pool->getConnection(timeouts); echoRequest("Hello", *connection); ASSERT_EQ(30, timeouts.http_keep_alive_timeout.totalSeconds()); @@ -819,3 +823,88 @@ TEST_F(ConnectionPoolTest, ServerOverwriteKeepAlive) ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count)); ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count)); } + +TEST_F(ConnectionPoolTest, MaxRequests) +{ + auto ka = Poco::Timespan(30, 0); // 30 seconds + timeouts.withHTTPKeepAliveTimeout(ka); + auto max_requests = 5; + timeouts.http_keep_alive_max_requests = max_requests; + + auto pool = getPool(); + auto metrics = pool->getMetrics(); + + for (int i = 1; i <= max_requests - 1; ++i) + { + auto connection = pool->getConnection(timeouts); + echoRequest("Hello", *connection); + ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds()); + ASSERT_EQ(max_requests, connection->getKeepAliveMaxRequests()); + ASSERT_EQ(i, connection->getKeepAliveRequest()); + } + + ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]); + ASSERT_EQ(max_requests-1, DB::CurrentThread::getProfileEvents()[metrics.preserved]); + ASSERT_EQ(max_requests-2, DB::CurrentThread::getProfileEvents()[metrics.reused]); + ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]); + ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.expired]); + + ASSERT_EQ(1, CurrentMetrics::get(metrics.active_count)); + ASSERT_EQ(1, CurrentMetrics::get(metrics.stored_count)); + + { + auto connection = pool->getConnection(timeouts); + echoRequest("Hello", *connection); + ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds()); + ASSERT_EQ(max_requests, connection->getKeepAliveMaxRequests()); + ASSERT_EQ(max_requests, connection->getKeepAliveRequest()); + } + + ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.created]); + ASSERT_EQ(max_requests-1, DB::CurrentThread::getProfileEvents()[metrics.preserved]); + ASSERT_EQ(max_requests-1, DB::CurrentThread::getProfileEvents()[metrics.reused]); + ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]); + ASSERT_EQ(1, DB::CurrentThread::getProfileEvents()[metrics.expired]); + + ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count)); + ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count)); +} + + +TEST_F(ConnectionPoolTest, ServerOverwriteMaxRequests) +{ + auto ka = Poco::Timespan(30, 0); // 30 seconds + timeouts.withHTTPKeepAliveTimeout(ka); + + auto pool = getPool(); + auto metrics = pool->getMetrics(); + + { + auto connection = pool->getConnection(timeouts); + echoRequest("Hello", *connection); + ASSERT_EQ(30, connection->getKeepAliveTimeout().totalSeconds()); + ASSERT_EQ(1000, connection->getKeepAliveMaxRequests()); + ASSERT_EQ(1, connection->getKeepAliveRequest()); + } + + auto max_requests = 3; + setOverWriteKeepAlive(5, max_requests); + + for (int i = 2; i <= 10*max_requests; ++i) + { + auto connection = pool->getConnection(timeouts); + echoRequest("Hello", *connection); + ASSERT_EQ(5, connection->getKeepAliveTimeout().totalSeconds()); + ASSERT_EQ(max_requests, connection->getKeepAliveMaxRequests()); + ASSERT_EQ(((i-1) % max_requests) + 1, connection->getKeepAliveRequest()); + } + + ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.created]); + ASSERT_EQ(10*max_requests-10, DB::CurrentThread::getProfileEvents()[metrics.preserved]); + ASSERT_EQ(10*max_requests-10, DB::CurrentThread::getProfileEvents()[metrics.reused]); + ASSERT_EQ(0, DB::CurrentThread::getProfileEvents()[metrics.reset]); + ASSERT_EQ(10, DB::CurrentThread::getProfileEvents()[metrics.expired]); + + ASSERT_EQ(0, CurrentMetrics::get(metrics.active_count)); + ASSERT_EQ(0, CurrentMetrics::get(metrics.stored_count)); +} diff --git a/src/Core/Defines.h b/src/Core/Defines.h index a8dd26519c2..f2142bc764d 100644 --- a/src/Core/Defines.h +++ b/src/Core/Defines.h @@ -54,6 +54,7 @@ static constexpr auto DEFAULT_COUNT_OF_HTTP_CONNECTIONS_PER_ENDPOINT = 15; static constexpr auto DEFAULT_TCP_KEEP_ALIVE_TIMEOUT = 290; static constexpr auto DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT = 30; +static constexpr auto DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST = 1000; static constexpr auto DBMS_DEFAULT_PATH = "/var/lib/clickhouse/"; diff --git a/src/Disks/ObjectStorages/S3/diskSettings.cpp b/src/Disks/ObjectStorages/S3/diskSettings.cpp index 7ce94699053..c3114eb0b6f 100644 --- a/src/Disks/ObjectStorages/S3/diskSettings.cpp +++ b/src/Disks/ObjectStorages/S3/diskSettings.cpp @@ -76,7 +76,8 @@ std::unique_ptr getClient( client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", S3::DEFAULT_CONNECT_TIMEOUT_MS); client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", S3::DEFAULT_REQUEST_TIMEOUT_MS); client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", S3::DEFAULT_MAX_CONNECTIONS); - client_configuration.http_keep_alive_timeout = config.getUInt(config_prefix + ".http_keep_alive_timeout", DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT); + client_configuration.http_keep_alive_timeout = config.getUInt(config_prefix + ".http_keep_alive_timeout", S3::DEFAULT_KEEP_ALIVE_TIMEOUT); + client_configuration.http_keep_alive_max_requests = config.getUInt(config_prefix + ".http_keep_alive_max_requests", S3::DEFAULT_KEEP_ALIVE_MAX_REQUESTS); client_configuration.endpointOverride = uri.endpoint; client_configuration.s3_use_adaptive_timeouts = config.getBool( diff --git a/src/IO/ConnectionTimeouts.cpp b/src/IO/ConnectionTimeouts.cpp index 8813c958185..da6214ae477 100644 --- a/src/IO/ConnectionTimeouts.cpp +++ b/src/IO/ConnectionTimeouts.cpp @@ -148,6 +148,7 @@ void setTimeouts(Poco::Net::HTTPClientSession & session, const ConnectionTimeout if (!session.connected()) { session.setKeepAliveTimeout(timeouts.http_keep_alive_timeout); + session.setKeepAliveMaxRequests(int(timeouts.http_keep_alive_max_requests)); } } diff --git a/src/IO/ConnectionTimeouts.h b/src/IO/ConnectionTimeouts.h index 49305f42d85..f497285bd0c 100644 --- a/src/IO/ConnectionTimeouts.h +++ b/src/IO/ConnectionTimeouts.h @@ -35,6 +35,8 @@ struct ConnectionTimeouts Poco::Timespan tcp_keep_alive_timeout = Poco::Timespan(DEFAULT_TCP_KEEP_ALIVE_TIMEOUT, 0); Poco::Timespan http_keep_alive_timeout = Poco::Timespan(DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT, 0); + size_t http_keep_alive_max_requests = DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST; + /// Timeouts for HedgedConnections Poco::Timespan hedged_connection_timeout = Poco::Timespan(DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, 0); diff --git a/src/IO/S3/Credentials.h b/src/IO/S3/Credentials.h index 34dc0c1d2bd..8d586223035 100644 --- a/src/IO/S3/Credentials.h +++ b/src/IO/S3/Credentials.h @@ -22,6 +22,8 @@ inline static constexpr uint64_t DEFAULT_EXPIRATION_WINDOW_SECONDS = 120; inline static constexpr uint64_t DEFAULT_CONNECT_TIMEOUT_MS = 1000; inline static constexpr uint64_t DEFAULT_REQUEST_TIMEOUT_MS = 30000; inline static constexpr uint64_t DEFAULT_MAX_CONNECTIONS = 100; +inline static constexpr uint64_t DEFAULT_KEEP_ALIVE_TIMEOUT = 5; +inline static constexpr uint64_t DEFAULT_KEEP_ALIVE_MAX_REQUESTS = 100; /// In GCP metadata service can be accessed via DNS regardless of IPv4 or IPv6. static inline constexpr char GCP_METADATA_SERVICE_ENDPOINT[] = "http://metadata.google.internal"; diff --git a/src/IO/S3/PocoHTTPClient.h b/src/IO/S3/PocoHTTPClient.h index f568eb5ddb8..a0b35e9b4a9 100644 --- a/src/IO/S3/PocoHTTPClient.h +++ b/src/IO/S3/PocoHTTPClient.h @@ -52,6 +52,7 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration /// See PoolBase::BehaviourOnLimit bool s3_use_adaptive_timeouts = true; size_t http_keep_alive_timeout = DEFAULT_HTTP_KEEP_ALIVE_TIMEOUT; + size_t http_keep_alive_max_requests = DEFAULT_HTTP_KEEP_ALIVE_MAX_REQUEST; std::function error_report;