2024-03-03 13:22:40 +00:00
# include <Common/HTTPConnectionPool.h>
# include <Common/HostResolvePool.h>
# include <Common/ProfileEvents.h>
# include <Common/CurrentMetrics.h>
# include <Common/logger_useful.h>
# include <Common/Exception.h>
# include <Common/ErrorCodes.h>
# include <Common/ProxyConfiguration.h>
# include <Common/MemoryTrackerSwitcher.h>
# include <Common/SipHash.h>
2024-03-19 16:04:29 +00:00
# include <Poco/Net/HTTPChunkedStream.h>
2024-03-03 13:22:40 +00:00
# include <Poco/Net/HTTPClientSession.h>
# include <Poco/Net/HTTPFixedLengthStream.h>
2024-03-19 16:04:29 +00:00
# include <Poco/Net/HTTPRequest.h>
# include <Poco/Net/HTTPResponse.h>
# include <Poco/Net/HTTPStream.h>
2024-04-03 13:13:59 +00:00
# include <Poco/Net/NetException.h>
2024-03-03 13:22:40 +00:00
# include <Poco/Timespan.h>
2024-03-19 16:04:29 +00:00
# include <queue>
2024-03-03 13:22:40 +00:00
# include "config.h"
# if USE_SSL
# include <Poco/Net/HTTPSClientSession.h>
# endif
namespace ProfileEvents
{
extern const Event StorageConnectionsCreated ;
extern const Event StorageConnectionsReused ;
extern const Event StorageConnectionsReset ;
extern const Event StorageConnectionsPreserved ;
extern const Event StorageConnectionsExpired ;
extern const Event StorageConnectionsErrors ;
extern const Event StorageConnectionsElapsedMicroseconds ;
extern const Event DiskConnectionsCreated ;
extern const Event DiskConnectionsReused ;
extern const Event DiskConnectionsReset ;
extern const Event DiskConnectionsPreserved ;
extern const Event DiskConnectionsExpired ;
extern const Event DiskConnectionsErrors ;
extern const Event DiskConnectionsElapsedMicroseconds ;
extern const Event HTTPConnectionsCreated ;
extern const Event HTTPConnectionsReused ;
extern const Event HTTPConnectionsReset ;
extern const Event HTTPConnectionsPreserved ;
extern const Event HTTPConnectionsExpired ;
extern const Event HTTPConnectionsErrors ;
extern const Event HTTPConnectionsElapsedMicroseconds ;
}
namespace CurrentMetrics
{
extern const Metric StorageConnectionsStored ;
extern const Metric StorageConnectionsTotal ;
extern const Metric DiskConnectionsStored ;
extern const Metric DiskConnectionsTotal ;
extern const Metric HTTPConnectionsStored ;
extern const Metric HTTPConnectionsTotal ;
}
namespace
{
Poco : : Net : : HTTPClientSession : : ProxyConfig proxyConfigurationToPocoProxyConfig ( const DB : : ProxyConfiguration & proxy_configuration )
{
Poco : : Net : : HTTPClientSession : : ProxyConfig poco_proxy_config ;
poco_proxy_config . host = proxy_configuration . host ;
poco_proxy_config . port = proxy_configuration . port ;
poco_proxy_config . protocol = DB : : ProxyConfiguration : : protocolToString ( proxy_configuration . protocol ) ;
poco_proxy_config . tunnel = proxy_configuration . tunneling ;
poco_proxy_config . originalRequestProtocol = DB : : ProxyConfiguration : : protocolToString ( proxy_configuration . original_request_protocol ) ;
return poco_proxy_config ;
}
2024-04-03 13:13:59 +00:00
constexpr size_t roundUp ( size_t x , size_t rounding )
2024-03-03 13:22:40 +00:00
{
chassert ( rounding > 0 ) ;
2024-04-03 13:13:59 +00:00
return ( x + rounding ) / rounding * rounding ;
2024-03-03 13:22:40 +00:00
}
2024-04-03 13:13:59 +00:00
static_assert ( roundUp ( 10000 , 100 ) = = 10100 ) ;
static_assert ( roundUp ( 10001 , 100 ) = = 10100 ) ;
static_assert ( roundUp ( 10099 , 100 ) = = 10100 ) ;
static_assert ( roundUp ( 10100 , 100 ) = = 10200 ) ;
2024-03-03 13:22:40 +00:00
}
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED ;
extern const int UNSUPPORTED_URI_SCHEME ;
}
IHTTPConnectionPoolForEndpoint : : Metrics getMetricsForStorageConnectionPool ( )
{
return IHTTPConnectionPoolForEndpoint : : Metrics {
. created = ProfileEvents : : StorageConnectionsCreated ,
. reused = ProfileEvents : : StorageConnectionsReused ,
. reset = ProfileEvents : : StorageConnectionsReset ,
. preserved = ProfileEvents : : StorageConnectionsPreserved ,
. expired = ProfileEvents : : StorageConnectionsExpired ,
. errors = ProfileEvents : : StorageConnectionsErrors ,
. elapsed_microseconds = ProfileEvents : : StorageConnectionsElapsedMicroseconds ,
. stored_count = CurrentMetrics : : StorageConnectionsStored ,
. active_count = CurrentMetrics : : StorageConnectionsTotal ,
} ;
}
IHTTPConnectionPoolForEndpoint : : Metrics getMetricsForDiskConnectionPool ( )
{
return IHTTPConnectionPoolForEndpoint : : Metrics {
. created = ProfileEvents : : DiskConnectionsCreated ,
. reused = ProfileEvents : : DiskConnectionsReused ,
. reset = ProfileEvents : : DiskConnectionsReset ,
. preserved = ProfileEvents : : DiskConnectionsPreserved ,
. expired = ProfileEvents : : DiskConnectionsExpired ,
. errors = ProfileEvents : : DiskConnectionsErrors ,
. elapsed_microseconds = ProfileEvents : : DiskConnectionsElapsedMicroseconds ,
. stored_count = CurrentMetrics : : DiskConnectionsStored ,
. active_count = CurrentMetrics : : DiskConnectionsTotal ,
} ;
}
IHTTPConnectionPoolForEndpoint : : Metrics getMetricsForHTTPConnectionPool ( )
{
return IHTTPConnectionPoolForEndpoint : : Metrics {
. created = ProfileEvents : : HTTPConnectionsCreated ,
. reused = ProfileEvents : : HTTPConnectionsReused ,
. reset = ProfileEvents : : HTTPConnectionsReset ,
. preserved = ProfileEvents : : HTTPConnectionsPreserved ,
. expired = ProfileEvents : : HTTPConnectionsExpired ,
. errors = ProfileEvents : : HTTPConnectionsErrors ,
. elapsed_microseconds = ProfileEvents : : HTTPConnectionsElapsedMicroseconds ,
. stored_count = CurrentMetrics : : HTTPConnectionsStored ,
. active_count = CurrentMetrics : : HTTPConnectionsTotal ,
} ;
}
IHTTPConnectionPoolForEndpoint : : Metrics getConnectionPoolMetrics ( HTTPConnectionGroupType type )
{
switch ( type )
{
case HTTPConnectionGroupType : : STORAGE :
return getMetricsForStorageConnectionPool ( ) ;
case HTTPConnectionGroupType : : DISK :
return getMetricsForDiskConnectionPool ( ) ;
case HTTPConnectionGroupType : : HTTP :
return getMetricsForHTTPConnectionPool ( ) ;
}
}
class ConnectionGroup
{
public :
using Ptr = std : : shared_ptr < ConnectionGroup > ;
explicit ConnectionGroup ( HTTPConnectionGroupType type_ ) : type ( type_ ) , metrics ( getConnectionPoolMetrics ( type_ ) ) { }
void setLimits ( HTTPConnectionPools : : Limits limits_ )
{
std : : lock_guard lock ( mutex ) ;
limits = std : : move ( limits_ ) ;
mute_warning_until = 0 ;
}
bool isSoftLimitReached ( ) const
{
std : : lock_guard lock ( mutex ) ;
return total_connections_in_group > = limits . soft_limit ;
}
bool isStoreLimitReached ( ) const
{
std : : lock_guard lock ( mutex ) ;
return total_connections_in_group > = limits . store_limit ;
}
2024-04-04 13:57:42 +00:00
size_t getStored ( ) const
{
std : : lock_guard lock ( mutex ) ;
return total_connections_in_group ;
}
size_t getStoreLimit ( ) const
{
std : : lock_guard lock ( mutex ) ;
return limits . store_limit ;
}
2024-03-03 13:22:40 +00:00
void atConnectionCreate ( )
{
std : : lock_guard lock ( mutex ) ;
+ + total_connections_in_group ;
if ( total_connections_in_group > = limits . warning_limit & & total_connections_in_group > = mute_warning_until )
{
mute_warning_until = roundUp ( total_connections_in_group , limits . warning_step ) ;
2024-04-03 13:13:59 +00:00
LOG_WARNING ( log , " Too many active sessions in group {}, count {}, warning limit {}, next warning at {} " ,
type , total_connections_in_group , limits . warning_limit , mute_warning_until ) ;
2024-03-03 13:22:40 +00:00
}
}
void atConnectionDestroy ( )
{
std : : lock_guard lock ( mutex ) ;
- - total_connections_in_group ;
2024-04-03 13:13:59 +00:00
const size_t reduced_warning_limit = limits . warning_limit > 10 ? limits . warning_limit - 20 : 1 ;
2024-03-03 13:22:40 +00:00
if ( mute_warning_until > 0 & & total_connections_in_group < reduced_warning_limit )
{
LOG_WARNING ( log , " Sessions count is OK in the group {}, count {} " , type , total_connections_in_group ) ;
mute_warning_until = 0 ;
}
}
HTTPConnectionGroupType getType ( ) const { return type ; }
const IHTTPConnectionPoolForEndpoint : : Metrics & getMetrics ( ) const { return metrics ; }
private :
const HTTPConnectionGroupType type ;
const IHTTPConnectionPoolForEndpoint : : Metrics metrics ;
LoggerPtr log = getLogger ( " ConnectionGroup " ) ;
mutable std : : mutex mutex ;
HTTPConnectionPools : : Limits limits TSA_GUARDED_BY ( mutex ) = HTTPConnectionPools : : Limits ( ) ;
size_t total_connections_in_group TSA_GUARDED_BY ( mutex ) = 0 ;
size_t mute_warning_until TSA_GUARDED_BY ( mutex ) = 0 ;
} ;
class IExtendedPool : public IHTTPConnectionPoolForEndpoint
{
public :
using Ptr = std : : shared_ptr < IExtendedPool > ;
virtual HTTPConnectionGroupType getGroupType ( ) const = 0 ;
virtual size_t wipeExpired ( ) = 0 ;
} ;
// EndpointConnectionPool manage connections to the endpoint
// Features:
// - it uses HostResolver for address selecting. See Common/HostResolver.h for more info.
// - it minimizes number of `Session::connect()`/`Session::reconnect()` calls
// - stores only connected and ready to use sessions
// - connection could be reused even when limits are reached
// - soft limit, warn limit, store limit
// - `Session::reconnect()` uses the pool as well
// - comprehensive sensors
// - session is reused according its inner state, automatically
template < class Session >
class EndpointConnectionPool : public std : : enable_shared_from_this < EndpointConnectionPool < Session > > , public IExtendedPool
{
private :
friend class HTTPConnectionPools ;
using WeakPtr = std : : weak_ptr < EndpointConnectionPool < Session > > ;
class PooledConnection : public Session
{
public :
using Ptr = std : : shared_ptr < PooledConnection > ;
2024-04-03 13:13:59 +00:00
using Session : : mustReconnect ;
void markAsExpired ( )
{
isExpired = true ;
}
2024-03-03 13:22:40 +00:00
void reconnect ( ) override
{
Session : : close ( ) ;
if ( auto lock = pool . lock ( ) )
{
auto timeouts = getTimeouts ( * this ) ;
auto new_connection = lock - > getConnection ( timeouts ) ;
Session : : assign ( * new_connection ) ;
}
else
{
auto timer = CurrentThread : : getProfileEvents ( ) . timer ( metrics . elapsed_microseconds ) ;
Session : : reconnect ( ) ;
ProfileEvents : : increment ( metrics . created ) ;
}
}
String getTarget ( ) const
{
if ( ! Session : : getProxyConfig ( ) . host . empty ( ) )
2024-03-28 18:33:23 +00:00
return fmt : : format ( " {}:{} over proxy {} " ,
Session : : getHost ( ) ,
Session : : getPort ( ) ,
Session : : getProxyConfig ( ) . host ) ;
return fmt : : format ( " {}:{} " ,
Session : : getHost ( ) ,
Session : : getPort ( ) ) ;
2024-03-03 13:22:40 +00:00
}
2024-04-04 17:29:42 +00:00
Poco : : Timespan idleTime ( ) {
Poco : : Timestamp now ;
return now - Session : : getLastRequest ( ) ;
}
2024-03-03 13:22:40 +00:00
void flushRequest ( ) override
{
if ( bool ( request_stream ) )
{
request_stream - > flush ( ) ;
if ( auto * fixed_steam = dynamic_cast < Poco : : Net : : HTTPFixedLengthOutputStream * > ( request_stream ) )
{
request_stream_completed = fixed_steam - > isComplete ( ) ;
}
else if ( auto * chunked_steam = dynamic_cast < Poco : : Net : : HTTPChunkedOutputStream * > ( request_stream ) )
{
chunked_steam - > rdbuf ( ) - > close ( ) ;
request_stream_completed = chunked_steam - > isComplete ( ) ;
}
else if ( auto * http_stream = dynamic_cast < Poco : : Net : : HTTPOutputStream * > ( request_stream ) )
{
request_stream_completed = http_stream - > isComplete ( ) ;
}
else
{
request_stream_completed = false ;
}
}
request_stream = nullptr ;
Session : : flushRequest ( ) ;
}
2024-04-04 13:57:42 +00:00
String printAllHeaders ( Poco : : Net : : HTTPMessage & message ) const
{
String out ;
out . reserve ( 300 ) ;
for ( auto & [ k , v ] : message )
{
out . append ( fmt : : format ( " <{}: {}> " , k , v ) ) ;
}
return out ;
}
2024-03-03 13:22:40 +00:00
std : : ostream & sendRequest ( Poco : : Net : : HTTPRequest & request ) override
{
2024-04-04 17:29:42 +00:00
auto idle = idleTime ( ) ;
2024-03-03 13:22:40 +00:00
std : : ostream & result = Session : : sendRequest ( request ) ;
result . exceptions ( std : : ios : : badbit ) ;
2024-04-04 13:57:42 +00:00
// that line is for temporary debug, will be removed
2024-04-04 17:29:42 +00:00
LOG_INFO ( log , " Send request to {} with: version {}, method {}, usage count {}, keep-alive timeout={}, last usage ago: {}ms, headers: {} " ,
request . getVersion ( ) ,
request . getMethod ( ) ,
2024-04-04 13:57:42 +00:00
getTarget ( ) ,
usage_cnt ,
Session : : getKeepAliveTimeout ( ) . totalSeconds ( ) ,
2024-04-04 17:29:42 +00:00
idle . totalMilliseconds ( ) ,
2024-04-04 13:57:42 +00:00
printAllHeaders ( request ) ) ;
2024-03-03 13:22:40 +00:00
request_stream = & result ;
request_stream_completed = false ;
response_stream = nullptr ;
2024-03-07 12:56:51 +00:00
response_stream_completed = false ;
2024-03-03 13:22:40 +00:00
return result ;
}
std : : istream & receiveResponse ( Poco : : Net : : HTTPResponse & response ) override
{
2024-04-03 14:02:07 +00:00
int originKA = Session : : getKeepAliveTimeout ( ) . totalSeconds ( ) ;
2024-03-03 13:22:40 +00:00
std : : istream & result = Session : : receiveResponse ( response ) ;
result . exceptions ( std : : ios : : badbit ) ;
2024-04-03 13:13:59 +00:00
// that line is for temporary debug, will be removed
2024-04-04 17:29:42 +00:00
LOG_INFO ( log , " Received response from {} with: version {}, code {}, usage count {}, keep alive header: {}, original ka {}, last usage ago: {}ms, headers: {} " ,
getTarget ( ) ,
response . getVersion ( ) ,
int ( response . getStatus ( ) ) ,
usage_cnt ,
response . get ( Poco : : Net : : HTTPMessage : : CONNECTION_KEEP_ALIVE , Poco : : Net : : HTTPMessage : : EMPTY ) ,
originKA ,
idleTime ( ) . totalMilliseconds ( ) ,
printAllHeaders ( response ) ) ;
2024-04-03 13:13:59 +00:00
2024-03-03 13:22:40 +00:00
response_stream = & result ;
response_stream_completed = false ;
return result ;
}
void reset ( ) override
{
request_stream = nullptr ;
request_stream_completed = false ;
response_stream = nullptr ;
response_stream_completed = false ;
Session : : reset ( ) ;
}
~ PooledConnection ( ) override
{
if ( bool ( response_stream ) )
{
if ( auto * fixed_steam = dynamic_cast < Poco : : Net : : HTTPFixedLengthInputStream * > ( response_stream ) )
{
response_stream_completed = fixed_steam - > isComplete ( ) ;
}
else if ( auto * chunked_steam = dynamic_cast < Poco : : Net : : HTTPChunkedInputStream * > ( response_stream ) )
{
response_stream_completed = chunked_steam - > isComplete ( ) ;
}
else if ( auto * http_stream = dynamic_cast < Poco : : Net : : HTTPInputStream * > ( response_stream ) )
{
response_stream_completed = http_stream - > isComplete ( ) ;
}
else
{
response_stream_completed = false ;
}
}
response_stream = nullptr ;
2024-04-03 13:13:59 +00:00
group - > atConnectionDestroy ( ) ;
if ( ! isExpired )
2024-04-04 13:57:42 +00:00
{
2024-04-03 13:13:59 +00:00
if ( auto lock = pool . lock ( ) )
lock - > atConnectionDestroy ( * this ) ;
2024-04-04 13:57:42 +00:00
}
else
{
Poco : : Timestamp now ;
2024-04-04 17:29:42 +00:00
LOG_INFO ( log , " Expired connection to {} with: usage count {}, keep alive timeout: {}, last usage ago: {}s " ,
2024-04-04 13:57:42 +00:00
getTarget ( ) ,
usage_cnt ,
Session : : getKeepAliveTimeout ( ) . totalSeconds ( ) ,
2024-04-04 17:29:42 +00:00
idleTime ( ) . totalSeconds ( ) ) ;
2024-04-04 13:57:42 +00:00
}
2024-03-03 13:22:40 +00:00
CurrentMetrics : : sub ( metrics . active_count ) ;
}
private :
friend class EndpointConnectionPool ;
template < class . . . Args >
2024-04-03 13:13:59 +00:00
explicit PooledConnection ( EndpointConnectionPool : : WeakPtr pool_ , ConnectionGroup : : Ptr group_ , IHTTPConnectionPoolForEndpoint : : Metrics metrics_ , Args & & . . . args )
: Session ( args . . . ) , pool ( std : : move ( pool_ ) ) , group ( group_ ) , metrics ( std : : move ( metrics_ ) )
2024-03-03 13:22:40 +00:00
{
CurrentMetrics : : add ( metrics . active_count ) ;
2024-04-03 13:13:59 +00:00
group - > atConnectionCreate ( ) ;
2024-03-03 13:22:40 +00:00
}
template < class . . . Args >
static Ptr create ( Args & & . . . args )
{
/// Pool is global, we shouldn't attribute this memory to query/user.
MemoryTrackerSwitcher switcher { & total_memory_tracker } ;
struct make_shared_enabler : public PooledConnection
{
explicit make_shared_enabler ( Args & & . . . args ) : PooledConnection ( std : : forward < Args > ( args ) . . . ) { }
} ;
return std : : make_shared < make_shared_enabler > ( std : : forward < Args > ( args ) . . . ) ;
}
void doConnect ( )
{
Session : : reconnect ( ) ;
}
bool isCompleted ( ) const
{
return request_stream_completed & & response_stream_completed ;
}
2024-04-03 13:13:59 +00:00
EndpointConnectionPool : : WeakPtr pool ;
ConnectionGroup : : Ptr group ;
2024-03-03 13:22:40 +00:00
IHTTPConnectionPoolForEndpoint : : Metrics metrics ;
2024-04-03 13:13:59 +00:00
bool isExpired = false ;
2024-04-04 13:57:42 +00:00
size_t usage_cnt = 1 ;
2024-04-04 17:29:42 +00:00
size_t exception_level = std : : uncaught_exceptions ( ) ;
2024-03-03 13:22:40 +00:00
2024-04-03 13:13:59 +00:00
LoggerPtr log = getLogger ( " PooledConnection " ) ;
2024-03-03 13:22:40 +00:00
std : : ostream * request_stream = nullptr ;
std : : istream * response_stream = nullptr ;
bool request_stream_completed = true ;
bool response_stream_completed = true ;
} ;
using Connection = PooledConnection ;
using ConnectionPtr = PooledConnection : : Ptr ;
struct GreaterByLastRequest
{
static bool operator ( ) ( const ConnectionPtr & l , const ConnectionPtr & r )
{
return l - > getLastRequest ( ) + l - > getKeepAliveTimeout ( ) > r - > getLastRequest ( ) + r - > getKeepAliveTimeout ( ) ;
}
} ;
using ConnectionsMinHeap = std : : priority_queue < ConnectionPtr , std : : vector < ConnectionPtr > , GreaterByLastRequest > ;
public :
template < class . . . Args >
static Ptr create ( Args & & . . . args )
{
struct make_shared_enabler : public EndpointConnectionPool < Session >
{
explicit make_shared_enabler ( Args & & . . . args ) : EndpointConnectionPool < Session > ( std : : forward < Args > ( args ) . . . ) { }
} ;
return std : : make_shared < make_shared_enabler > ( std : : forward < Args > ( args ) . . . ) ;
}
~ EndpointConnectionPool ( ) override
{
CurrentMetrics : : sub ( group - > getMetrics ( ) . stored_count , stored_connections . size ( ) ) ;
}
String getTarget ( ) const
{
if ( ! proxy_configuration . isEmpty ( ) )
2024-03-28 18:33:23 +00:00
return fmt : : format ( " {} over proxy {} " ,
host , proxy_configuration . host ) ;
2024-03-03 13:22:40 +00:00
return host ;
}
IHTTPConnectionPoolForEndpoint : : ConnectionPtr getConnection ( const ConnectionTimeouts & timeouts ) override
{
std : : vector < ConnectionPtr > expired_connections ;
SCOPE_EXIT ( {
MemoryTrackerSwitcher switcher { & total_memory_tracker } ;
expired_connections . clear ( ) ;
} ) ;
{
std : : lock_guard lock ( mutex ) ;
2024-04-03 13:13:59 +00:00
expired_connections . reserve ( stored_connections . size ( ) ) ;
2024-03-03 13:22:40 +00:00
2024-04-03 13:13:59 +00:00
wipeExpiredImpl ( expired_connections ) ;
2024-03-03 13:22:40 +00:00
if ( ! stored_connections . empty ( ) )
{
auto it = stored_connections . top ( ) ;
stored_connections . pop ( ) ;
setTimeouts ( * it , timeouts ) ;
2024-04-04 13:57:42 +00:00
it - > usage_cnt + = 1 ;
2024-03-03 13:22:40 +00:00
ProfileEvents : : increment ( getMetrics ( ) . reused , 1 ) ;
CurrentMetrics : : sub ( getMetrics ( ) . stored_count , 1 ) ;
return it ;
}
}
return prepareNewConnection ( timeouts ) ;
}
const IHTTPConnectionPoolForEndpoint : : Metrics & getMetrics ( ) const override
{
return group - > getMetrics ( ) ;
}
HTTPConnectionGroupType getGroupType ( ) const override
{
return group - > getType ( ) ;
}
size_t wipeExpired ( ) override
{
std : : vector < ConnectionPtr > expired_connections ;
SCOPE_EXIT ( {
MemoryTrackerSwitcher switcher { & total_memory_tracker } ;
expired_connections . clear ( ) ;
} ) ;
std : : lock_guard lock ( mutex ) ;
2024-04-03 13:13:59 +00:00
return wipeExpiredImpl ( expired_connections ) ;
2024-03-03 13:22:40 +00:00
}
2024-04-03 13:13:59 +00:00
size_t wipeExpiredImpl ( std : : vector < ConnectionPtr > & expired_connections ) TSA_REQUIRES ( mutex )
2024-03-03 13:22:40 +00:00
{
2024-04-03 13:33:35 +00:00
SCOPE_EXIT ( {
CurrentMetrics : : sub ( getMetrics ( ) . stored_count , expired_connections . size ( ) ) ;
ProfileEvents : : increment ( getMetrics ( ) . expired , expired_connections . size ( ) ) ;
} ) ;
2024-04-03 13:13:59 +00:00
auto isSoftLimitReached = group - > isSoftLimitReached ( ) ;
2024-03-03 13:22:40 +00:00
while ( ! stored_connections . empty ( ) )
{
auto connection = stored_connections . top ( ) ;
2024-04-03 13:13:59 +00:00
if ( ! isExpired ( connection , isSoftLimitReached ) )
2024-03-03 13:22:40 +00:00
return stored_connections . size ( ) ;
stored_connections . pop ( ) ;
2024-04-03 13:13:59 +00:00
connection - > markAsExpired ( ) ;
2024-03-03 13:22:40 +00:00
expired_connections . push_back ( connection ) ;
}
return stored_connections . size ( ) ;
}
private :
EndpointConnectionPool ( ConnectionGroup : : Ptr group_ , String host_ , UInt16 port_ , bool https_ , ProxyConfiguration proxy_configuration_ )
: host ( std : : move ( host_ ) )
, port ( port_ )
, https ( https_ )
, proxy_configuration ( std : : move ( proxy_configuration_ ) )
, group ( group_ )
{
}
WeakPtr getWeakFromThis ( ) { return EndpointConnectionPool : : weak_from_this ( ) ; }
2024-04-03 13:13:59 +00:00
bool isExpired ( ConnectionPtr connection , bool isSoftLimitReached ) TSA_REQUIRES ( mutex )
2024-03-03 13:22:40 +00:00
{
2024-04-03 13:13:59 +00:00
if ( isSoftLimitReached )
return connection - > isKeepAliveExpired ( 0.1 ) ;
return connection - > isKeepAliveExpired ( 0.8 ) ;
2024-03-03 13:22:40 +00:00
}
ConnectionPtr allocateNewConnection ( )
{
2024-04-03 13:13:59 +00:00
ConnectionPtr connection = PooledConnection : : create ( this - > getWeakFromThis ( ) , group , getMetrics ( ) , host , port ) ;
2024-03-03 13:22:40 +00:00
connection - > setKeepAlive ( true ) ;
if ( ! proxy_configuration . isEmpty ( ) )
{
connection - > setProxyConfig ( proxyConfigurationToPocoProxyConfig ( proxy_configuration ) ) ;
}
return connection ;
}
ConnectionPtr prepareNewConnection ( const ConnectionTimeouts & timeouts )
{
auto address = HostResolversPool : : instance ( ) . getResolver ( host ) - > resolve ( ) ;
auto session = allocateNewConnection ( ) ;
setTimeouts ( * session , timeouts ) ;
session - > setResolvedHost ( * address ) ;
try
{
auto timer = CurrentThread : : getProfileEvents ( ) . timer ( getMetrics ( ) . elapsed_microseconds ) ;
session - > doConnect ( ) ;
}
catch ( . . . )
{
address . setFail ( ) ;
ProfileEvents : : increment ( getMetrics ( ) . errors ) ;
session - > reset ( ) ;
throw ;
}
ProfileEvents : : increment ( getMetrics ( ) . created ) ;
return session ;
}
void atConnectionDestroy ( PooledConnection & connection )
{
if ( ! connection . connected ( ) | | connection . mustReconnect ( ) | | ! connection . isCompleted ( ) | | connection . buffered ( )
| | group - > isStoreLimitReached ( ) )
{
2024-04-04 13:57:42 +00:00
Poco : : Timestamp now ;
LOG_INFO ( getLogger ( " PooledConnection " ) ,
2024-04-04 17:29:42 +00:00
" Reset connection to {} with: usage count {}, keep alive timeout: {}, connected {}, must recon {}, last usage ago: {}, is completed {}, store limit reached {} as {}/{}, there is exception {} " ,
2024-04-04 13:57:42 +00:00
getTarget ( ) ,
connection . usage_cnt ,
connection . getKeepAliveTimeout ( ) . totalSeconds ( ) ,
2024-04-04 17:29:42 +00:00
connection . connected ( ) ,
connection . mustReconnect ( ) ,
connection . idleTime ( ) . totalSeconds ( ) ,
2024-04-04 13:57:42 +00:00
connection . isCompleted ( ) ,
2024-04-04 17:29:42 +00:00
group - > isStoreLimitReached ( ) , group - > getStored ( ) , group - > getStoreLimit ( ) ,
connection . exception_level - std : : uncaught_exceptions ( ) ) ;
2024-04-04 13:57:42 +00:00
2024-03-03 13:22:40 +00:00
ProfileEvents : : increment ( getMetrics ( ) . reset , 1 ) ;
return ;
}
auto connection_to_store = allocateNewConnection ( ) ;
connection_to_store - > assign ( connection ) ;
2024-04-04 13:57:42 +00:00
connection_to_store - > usage_cnt = connection . usage_cnt ;
2024-03-03 13:22:40 +00:00
{
MemoryTrackerSwitcher switcher { & total_memory_tracker } ;
std : : lock_guard lock ( mutex ) ;
stored_connections . push ( connection_to_store ) ;
}
2024-04-03 13:13:59 +00:00
CurrentMetrics : : add ( getMetrics ( ) . stored_count , 1 ) ;
ProfileEvents : : increment ( getMetrics ( ) . preserved , 1 ) ;
2024-03-03 13:22:40 +00:00
}
const std : : string host ;
const UInt16 port ;
const bool https ;
const ProxyConfiguration proxy_configuration ;
const ConnectionGroup : : Ptr group ;
std : : mutex mutex ;
ConnectionsMinHeap stored_connections TSA_GUARDED_BY ( mutex ) ;
} ;
struct EndpointPoolKey
{
HTTPConnectionGroupType connection_group ;
String target_host ;
UInt16 target_port ;
bool is_target_https ;
ProxyConfiguration proxy_config ;
bool operator = = ( const EndpointPoolKey & rhs ) const
{
return std : : tie (
connection_group ,
target_host ,
target_port ,
is_target_https ,
proxy_config . host ,
proxy_config . port ,
proxy_config . protocol ,
proxy_config . tunneling ,
proxy_config . original_request_protocol )
= = std : : tie (
rhs . connection_group ,
rhs . target_host ,
rhs . target_port ,
rhs . is_target_https ,
rhs . proxy_config . host ,
rhs . proxy_config . port ,
rhs . proxy_config . protocol ,
rhs . proxy_config . tunneling ,
rhs . proxy_config . original_request_protocol ) ;
}
} ;
struct Hasher
{
size_t operator ( ) ( const EndpointPoolKey & k ) const
{
SipHash s ;
s . update ( k . connection_group ) ;
s . update ( k . target_host ) ;
s . update ( k . target_port ) ;
s . update ( k . is_target_https ) ;
s . update ( k . proxy_config . host ) ;
s . update ( k . proxy_config . port ) ;
s . update ( k . proxy_config . protocol ) ;
s . update ( k . proxy_config . tunneling ) ;
s . update ( k . proxy_config . original_request_protocol ) ;
return s . get64 ( ) ;
}
} ;
IExtendedPool : : Ptr
createConnectionPool ( ConnectionGroup : : Ptr group , std : : string host , UInt16 port , bool secure , ProxyConfiguration proxy_configuration )
{
if ( secure )
{
# if USE_SSL
return EndpointConnectionPool < Poco : : Net : : HTTPSClientSession > : : create (
group , std : : move ( host ) , port , secure , std : : move ( proxy_configuration ) ) ;
# else
throw Exception (
ErrorCodes : : SUPPORT_IS_DISABLED , " Inter-server secret support is disabled, because ClickHouse was built without SSL library " ) ;
# endif
}
else
{
return EndpointConnectionPool < Poco : : Net : : HTTPClientSession > : : create (
group , std : : move ( host ) , port , secure , std : : move ( proxy_configuration ) ) ;
}
}
class HTTPConnectionPools : : Impl
{
private :
2024-04-03 13:13:59 +00:00
const size_t DEFAULT_WIPE_TIMEOUT_SECONDS = 10 * 60 ;
2024-03-03 13:22:40 +00:00
const Poco : : Timespan wipe_timeout = Poco : : Timespan ( DEFAULT_WIPE_TIMEOUT_SECONDS , 0 ) ;
ConnectionGroup : : Ptr disk_group = std : : make_shared < ConnectionGroup > ( HTTPConnectionGroupType : : DISK ) ;
ConnectionGroup : : Ptr storage_group = std : : make_shared < ConnectionGroup > ( HTTPConnectionGroupType : : STORAGE ) ;
ConnectionGroup : : Ptr http_group = std : : make_shared < ConnectionGroup > ( HTTPConnectionGroupType : : HTTP ) ;
/// If multiple mutexes are held simultaneously,
/// they should be locked in this order:
/// HTTPConnectionPools::mutex, then EndpointConnectionPool::mutex, then ConnectionGroup::mutex.
std : : mutex mutex ;
std : : unordered_map < EndpointPoolKey , IExtendedPool : : Ptr , Hasher > endpoints_pool TSA_GUARDED_BY ( mutex ) ;
Poco : : Timestamp last_wipe_time TSA_GUARDED_BY ( mutex ) ;
public :
IHTTPConnectionPoolForEndpoint : : Ptr getPool ( HTTPConnectionGroupType type , const Poco : : URI & uri , const ProxyConfiguration & proxy_configuration )
{
Poco : : Timestamp now ;
std : : lock_guard lock ( mutex ) ;
if ( now - last_wipe_time > wipe_timeout )
{
wipeExpired ( ) ;
last_wipe_time = now ;
}
return getPoolImpl ( type , uri , proxy_configuration ) ;
}
void setLimits ( HTTPConnectionPools : : Limits disk , HTTPConnectionPools : : Limits storage , HTTPConnectionPools : : Limits http )
{
disk_group - > setLimits ( std : : move ( disk ) ) ;
storage_group - > setLimits ( std : : move ( storage ) ) ;
http_group - > setLimits ( std : : move ( http ) ) ;
}
void dropCache ( )
{
std : : lock_guard lock ( mutex ) ;
endpoints_pool . clear ( ) ;
}
protected :
ConnectionGroup : : Ptr & getGroup ( HTTPConnectionGroupType type )
{
switch ( type )
{
case HTTPConnectionGroupType : : DISK :
return disk_group ;
case HTTPConnectionGroupType : : STORAGE :
return storage_group ;
case HTTPConnectionGroupType : : HTTP :
return http_group ;
}
}
IExtendedPool : : Ptr getPoolImpl ( HTTPConnectionGroupType type , const Poco : : URI & uri , const ProxyConfiguration & proxy_configuration )
TSA_REQUIRES ( mutex )
{
auto [ host , port , secure ] = getHostPortSecure ( uri , proxy_configuration ) ;
auto key = EndpointPoolKey { type , host , port , secure , proxy_configuration } ;
auto it = endpoints_pool . find ( key ) ;
if ( it ! = endpoints_pool . end ( ) )
return it - > second ;
it = endpoints_pool . emplace ( key , createConnectionPool ( getGroup ( type ) , std : : move ( host ) , port , secure , proxy_configuration ) ) . first ;
return it - > second ;
}
void wipeExpired ( ) TSA_REQUIRES ( mutex )
{
std : : vector < EndpointPoolKey > keys_to_drop ;
for ( auto & [ key , pool ] : endpoints_pool )
{
auto left_connections = pool - > wipeExpired ( ) ;
if ( left_connections = = 0 & & pool - > getGroupType ( ) ! = HTTPConnectionGroupType : : DISK )
keys_to_drop . push_back ( key ) ;
}
for ( const auto & key : keys_to_drop )
endpoints_pool . erase ( key ) ;
}
static bool useSecureConnection ( const Poco : : URI & uri , const ProxyConfiguration & proxy_configuration )
{
if ( uri . getScheme ( ) = = " http " )
return false ;
if ( uri . getScheme ( ) ! = " https " )
throw Exception ( ErrorCodes : : UNSUPPORTED_URI_SCHEME , " Unsupported scheme in URI '{}' " , uri . toString ( ) ) ;
if ( ! proxy_configuration . isEmpty ( ) )
{
if ( ProxyConfiguration : : Protocol : : HTTP = = proxy_configuration . protocol & & ! proxy_configuration . tunneling )
{
// If it is an HTTPS request, proxy server is HTTP and user opted for tunneling off, we must not create an HTTPS request.
// The desired flow is: HTTP request to the proxy server, then proxy server will initiate an HTTPS request to the target server.
// There is a weak link in the security, but that's what the user opted for.
return false ;
}
}
return true ;
}
static std : : tuple < std : : string , UInt16 , bool > getHostPortSecure ( const Poco : : URI & uri , const ProxyConfiguration & proxy_configuration )
{
return std : : make_tuple ( uri . getHost ( ) , uri . getPort ( ) , useSecureConnection ( uri , proxy_configuration ) ) ;
}
} ;
HTTPConnectionPools : : HTTPConnectionPools ( )
: impl ( std : : make_unique < HTTPConnectionPools : : Impl > ( ) )
{
}
HTTPConnectionPools & HTTPConnectionPools : : instance ( )
{
static HTTPConnectionPools instance ;
return instance ;
}
void HTTPConnectionPools : : setLimits ( HTTPConnectionPools : : Limits disk , HTTPConnectionPools : : Limits storage , HTTPConnectionPools : : Limits http )
{
impl - > setLimits ( std : : move ( disk ) , std : : move ( storage ) , std : : move ( http ) ) ;
}
void HTTPConnectionPools : : dropCache ( )
{
impl - > dropCache ( ) ;
}
IHTTPConnectionPoolForEndpoint : : Ptr
HTTPConnectionPools : : getPool ( HTTPConnectionGroupType type , const Poco : : URI & uri , const ProxyConfiguration & proxy_configuration )
{
return impl - > getPool ( type , uri , proxy_configuration ) ;
}
}