2021-02-17 17:34:52 +00:00
# include <Client/ConnectionEstablisher.h>
# include <Common/quoteString.h>
# include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event DistributedConnectionMissingTable ;
extern const Event DistributedConnectionStaleReplica ;
}
namespace DB
{
namespace ErrorCodes
{
extern const int ATTEMPT_TO_READ_AFTER_EOF ;
extern const int NETWORK_ERROR ;
extern const int SOCKET_TIMEOUT ;
}
ConnectionEstablisher : : ConnectionEstablisher (
IConnectionPool * pool_ ,
const ConnectionTimeouts * timeouts_ ,
const Settings * settings_ ,
2021-02-21 14:03:24 +00:00
Poco : : Logger * log_ ,
2021-02-21 21:59:07 +00:00
const QualifiedTableName * table_to_check_ )
: pool ( pool_ ) , timeouts ( timeouts_ ) , settings ( settings_ ) , log ( log_ ) , table_to_check ( table_to_check_ ) , is_finished ( false )
2021-02-21 14:03:24 +00:00
{
}
void ConnectionEstablisher : : run ( ConnectionEstablisher : : TryResult & result , std : : string & fail_message )
{
is_finished = false ;
SCOPE_EXIT ( is_finished = true ) ;
try
{
result . entry = pool - > get ( * timeouts , settings , /* force_connected = */ false ) ;
AsyncCallbackSetter async_setter ( & * result . entry , std : : move ( async_callback ) ) ;
UInt64 server_revision = 0 ;
if ( table_to_check )
server_revision = result . entry - > getServerRevision ( * timeouts ) ;
if ( ! table_to_check | | server_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS )
{
result . entry - > forceConnected ( * timeouts ) ;
result . is_usable = true ;
result . is_up_to_date = true ;
return ;
}
/// Only status of the remote table corresponding to the Distributed table is taken into account.
/// TODO: request status for joined tables also.
TablesStatusRequest status_request ;
status_request . tables . emplace ( * table_to_check ) ;
TablesStatusResponse status_response = result . entry - > getTablesStatus ( * timeouts , status_request ) ;
auto table_status_it = status_response . table_states_by_id . find ( * table_to_check ) ;
if ( table_status_it = = status_response . table_states_by_id . end ( ) )
{
const char * message_pattern = " There is no table {}.{} on server: {} " ;
fail_message = fmt : : format ( message_pattern , backQuote ( table_to_check - > database ) , backQuote ( table_to_check - > table ) , result . entry - > getDescription ( ) ) ;
LOG_WARNING ( log , fail_message ) ;
ProfileEvents : : increment ( ProfileEvents : : DistributedConnectionMissingTable ) ;
return ;
}
result . is_usable = true ;
UInt64 max_allowed_delay = settings ? UInt64 ( settings - > max_replica_delay_for_distributed_queries ) : 0 ;
if ( ! max_allowed_delay )
{
result . is_up_to_date = true ;
return ;
}
UInt32 delay = table_status_it - > second . absolute_delay ;
if ( delay < max_allowed_delay )
result . is_up_to_date = true ;
else
{
result . is_up_to_date = false ;
result . staleness = delay ;
LOG_TRACE ( log , " Server {} has unacceptable replica delay for table {}.{}: {} " , result . entry - > getDescription ( ) , table_to_check - > database , table_to_check - > table , delay ) ;
ProfileEvents : : increment ( ProfileEvents : : DistributedConnectionStaleReplica ) ;
}
}
catch ( const Exception & e )
{
if ( e . code ( ) ! = ErrorCodes : : NETWORK_ERROR & & e . code ( ) ! = ErrorCodes : : SOCKET_TIMEOUT
& & e . code ( ) ! = ErrorCodes : : ATTEMPT_TO_READ_AFTER_EOF )
throw ;
fail_message = getCurrentExceptionMessage ( /* with_stacktrace = */ false ) ;
if ( ! result . entry . isNull ( ) )
{
result . entry - > disconnect ( ) ;
result . reset ( ) ;
}
}
}
2021-02-22 07:52:19 +00:00
# if defined(OS_LINUX)
2021-02-21 14:03:24 +00:00
ConnectionEstablisherAsync : : ConnectionEstablisherAsync (
IConnectionPool * pool_ ,
const ConnectionTimeouts * timeouts_ ,
const Settings * settings_ ,
Poco : : Logger * log_ ,
2021-02-17 17:34:52 +00:00
const QualifiedTableName * table_to_check_ )
2021-02-21 14:03:24 +00:00
: connection_establisher ( pool_ , timeouts_ , settings_ , log_ , table_to_check_ )
2021-02-17 17:34:52 +00:00
{
epoll . add ( receive_timeout . getDescriptor ( ) ) ;
}
2021-02-21 14:03:24 +00:00
void ConnectionEstablisherAsync : : Routine : : ReadCallback : : operator ( ) ( int fd , const Poco : : Timespan & timeout , const std : : string & )
2021-02-17 17:34:52 +00:00
{
2021-02-26 15:53:40 +00:00
/// Check if it's the first time and we need to add socket fd to epoll.
if ( connection_establisher_async . socket_fd = = - 1 )
2021-02-17 17:34:52 +00:00
{
2021-02-21 14:03:24 +00:00
connection_establisher_async . epoll . add ( fd ) ;
connection_establisher_async . socket_fd = fd ;
2021-02-17 17:34:52 +00:00
}
2021-02-21 14:03:24 +00:00
connection_establisher_async . receive_timeout . setRelative ( timeout ) ;
2021-02-17 17:34:52 +00:00
fiber = std : : move ( fiber ) . resume ( ) ;
2021-02-21 14:03:24 +00:00
connection_establisher_async . receive_timeout . reset ( ) ;
2021-02-17 17:34:52 +00:00
}
2021-02-21 14:03:24 +00:00
Fiber ConnectionEstablisherAsync : : Routine : : operator ( ) ( Fiber & & sink )
2021-02-17 17:34:52 +00:00
{
try
{
2021-02-21 14:03:24 +00:00
connection_establisher_async . connection_establisher . setAsyncCallback ( ReadCallback { connection_establisher_async , sink } ) ;
connection_establisher_async . connection_establisher . run ( connection_establisher_async . result , connection_establisher_async . fail_message ) ;
2021-02-17 17:34:52 +00:00
}
catch ( const boost : : context : : detail : : forced_unwind & )
{
/// This exception is thrown by fiber implementation in case if fiber is being deleted but hasn't exited
/// It should not be caught or it will segfault.
/// Other exceptions must be caught
throw ;
}
catch ( . . . )
{
2021-02-21 14:03:24 +00:00
connection_establisher_async . exception = std : : current_exception ( ) ;
2021-02-17 17:34:52 +00:00
}
return std : : move ( sink ) ;
}
2021-02-21 14:03:24 +00:00
std : : variant < int , ConnectionEstablisher : : TryResult > ConnectionEstablisherAsync : : resume ( )
2021-02-17 17:34:52 +00:00
{
if ( ! fiber_created )
{
reset ( ) ;
fiber = boost : : context : : fiber ( std : : allocator_arg_t ( ) , fiber_stack , Routine { * this } ) ;
fiber_created = true ;
2021-02-21 14:03:24 +00:00
} else if ( ! checkReceiveTimeout ( ) )
return result ;
fiber = std : : move ( fiber ) . resume ( ) ;
if ( exception )
std : : rethrow_exception ( std : : move ( exception ) ) ;
if ( connection_establisher . isFinished ( ) )
{
destroyFiber ( ) ;
return result ;
2021-02-17 17:34:52 +00:00
}
2021-02-21 14:03:24 +00:00
return epoll . getFileDescriptor ( ) ;
}
bool ConnectionEstablisherAsync : : checkReceiveTimeout ( )
{
2021-02-17 17:34:52 +00:00
bool is_socket_ready = false ;
bool is_receive_timeout_alarmed = false ;
epoll_event events [ 2 ] ;
2021-02-18 11:21:48 +00:00
events [ 0 ] . data . fd = events [ 1 ] . data . fd = - 1 ;
2021-02-21 14:03:24 +00:00
size_t ready_count = epoll . getManyReady ( 2 , events , false ) ;
2021-02-17 17:34:52 +00:00
for ( size_t i = 0 ; i ! = ready_count ; + + i )
{
if ( events [ i ] . data . fd = = socket_fd )
is_socket_ready = true ;
if ( events [ i ] . data . fd = = receive_timeout . getDescriptor ( ) )
is_receive_timeout_alarmed = true ;
}
if ( is_receive_timeout_alarmed & & ! is_socket_ready )
2021-02-21 14:03:24 +00:00
{
destroyFiber ( ) ;
/// In not async case this exception would be thrown and caught in ConnectionEstablisher::run,
/// but in async case we process timeout outside and cannot throw exception. So, we just save fail message.
fail_message = " Timeout exceeded while reading from socket ( " + result . entry - > getDescription ( ) + " ) " ;
epoll . remove ( socket_fd ) ;
resetResult ( ) ;
return false ;
}
2021-02-17 17:34:52 +00:00
2021-02-21 14:03:24 +00:00
return true ;
2021-02-17 17:34:52 +00:00
}
2021-02-21 14:03:24 +00:00
void ConnectionEstablisherAsync : : cancel ( )
2021-02-17 17:34:52 +00:00
{
destroyFiber ( ) ;
reset ( ) ;
}
2021-02-21 14:03:24 +00:00
void ConnectionEstablisherAsync : : reset ( )
2021-02-17 17:34:52 +00:00
{
resetResult ( ) ;
2021-02-21 14:03:24 +00:00
fail_message . clear ( ) ;
socket_fd = - 1 ;
2021-02-17 17:34:52 +00:00
}
2021-02-21 14:03:24 +00:00
void ConnectionEstablisherAsync : : resetResult ( )
2021-02-17 17:34:52 +00:00
{
if ( ! result . entry . isNull ( ) )
{
result . entry - > disconnect ( ) ;
result . reset ( ) ;
}
}
2021-02-21 14:03:24 +00:00
void ConnectionEstablisherAsync : : destroyFiber ( )
2021-02-17 17:34:52 +00:00
{
Fiber to_destroy = std : : move ( fiber ) ;
fiber_created = false ;
}
2021-02-22 07:52:19 +00:00
# endif
2021-02-17 17:34:52 +00:00
}