2023-05-05 01:52:33 +00:00
# include "ReadWriteBufferFromHTTP.h"
2023-07-03 21:11:32 +00:00
# include <IO/HTTPCommon.h>
2024-03-03 13:22:40 +00:00
# include <Common/NetException.h>
# include <Poco/Net/NetException.h>
2023-07-03 21:11:32 +00:00
2023-05-05 01:52:33 +00:00
namespace ProfileEvents
{
2024-01-24 11:19:10 +00:00
extern const Event ReadBufferSeekCancelConnection ;
2024-03-26 09:14:20 +00:00
extern const Event ReadWriteBufferFromHTTPRequestsSent ;
extern const Event ReadWriteBufferFromHTTPBytes ;
2023-05-05 01:52:33 +00:00
}
2024-03-03 13:22:40 +00:00
namespace
2023-05-05 01:52:33 +00:00
{
2024-03-03 13:22:40 +00:00
bool isRetriableError ( const Poco : : Net : : HTTPResponse : : HTTPStatus http_status ) noexcept
2023-05-05 01:52:33 +00:00
{
static constexpr std : : array non_retriable_errors {
Poco : : Net : : HTTPResponse : : HTTPStatus : : HTTP_BAD_REQUEST ,
Poco : : Net : : HTTPResponse : : HTTPStatus : : HTTP_UNAUTHORIZED ,
Poco : : Net : : HTTPResponse : : HTTPStatus : : HTTP_NOT_FOUND ,
Poco : : Net : : HTTPResponse : : HTTPStatus : : HTTP_FORBIDDEN ,
Poco : : Net : : HTTPResponse : : HTTPStatus : : HTTP_NOT_IMPLEMENTED ,
Poco : : Net : : HTTPResponse : : HTTPStatus : : HTTP_METHOD_NOT_ALLOWED } ;
return std : : all_of (
non_retriable_errors . begin ( ) , non_retriable_errors . end ( ) , [ & ] ( const auto status ) { return http_status ! = status ; } ) ;
}
2024-03-03 13:22:40 +00:00
Poco : : URI getUriAfterRedirect ( const Poco : : URI & prev_uri , Poco : : Net : : HTTPResponse & response )
2023-05-05 01:52:33 +00:00
{
2024-03-03 13:22:40 +00:00
chassert ( DB : : isRedirect ( response . getStatus ( ) ) ) ;
2023-05-05 01:52:33 +00:00
auto location = response . get ( " Location " ) ;
auto location_uri = Poco : : URI ( location ) ;
if ( ! location_uri . isRelative ( ) )
return location_uri ;
/// Location header contains relative path. So we need to concatenate it
/// with path from the original URI and normalize it.
auto path = std : : filesystem : : weakly_canonical ( std : : filesystem : : path ( prev_uri . getPath ( ) ) / location ) ;
location_uri = prev_uri ;
location_uri . setPath ( path ) ;
return location_uri ;
}
2024-03-03 13:22:40 +00:00
class ReadBufferFromSessionResponse : public DB : : ReadBufferFromIStream
{
private :
DB : : HTTPSessionPtr session ;
public :
ReadBufferFromSessionResponse ( DB : : HTTPSessionPtr & & session_ , std : : istream & rstr , size_t size )
: ReadBufferFromIStream ( rstr , size )
, session ( std : : move ( session_ ) )
{
}
} ;
}
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_REDIRECTS ;
extern const int HTTP_RANGE_NOT_SATISFIABLE ;
extern const int BAD_ARGUMENTS ;
extern const int CANNOT_SEEK_THROUGH_FILE ;
extern const int SEEK_POSITION_OUT_OF_BOUND ;
extern const int UNKNOWN_FILE_SIZE ;
}
std : : unique_ptr < ReadBuffer > ReadWriteBufferFromHTTP : : CallResult : : transformToReadBuffer ( size_t buf_size ) & &
{
chassert ( session ) ;
return std : : make_unique < ReadBufferFromSessionResponse > ( std : : move ( session ) , * response_stream , buf_size ) ;
}
bool ReadWriteBufferFromHTTP : : withPartialContent ( ) const
2023-05-05 01:52:33 +00:00
{
/**
* Add range header if we have some passed range
* or if we want to retry GET request on purpose .
*/
2024-03-03 13:22:40 +00:00
return read_range . begin | | read_range . end | | getOffset ( ) > 0 ;
2023-05-05 01:52:33 +00:00
}
2024-03-03 13:22:40 +00:00
size_t ReadWriteBufferFromHTTP : : getOffset ( ) const
{
return read_range . begin . value_or ( 0 ) + offset_from_begin_pos ;
}
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
void ReadWriteBufferFromHTTP : : prepareRequest ( Poco : : Net : : HTTPRequest & request , std : : optional < HTTPRange > range ) const
2023-05-05 01:52:33 +00:00
{
2024-03-28 18:33:23 +00:00
request . setHost ( current_uri . getHost ( ) ) ;
2023-05-05 01:52:33 +00:00
if ( out_stream_callback )
request . setChunkedTransferEncoding ( true ) ;
else if ( method = = Poco : : Net : : HTTPRequest : : HTTP_POST )
request . setContentLength ( 0 ) ; /// No callback - no body
2023-05-05 03:11:51 +00:00
for ( const auto & [ header , value ] : http_header_entries )
2023-05-05 01:52:33 +00:00
request . set ( header , value ) ;
if ( range )
{
String range_header_value ;
if ( range - > end )
range_header_value = fmt : : format ( " bytes={}-{} " , * range - > begin , * range - > end ) ;
else
range_header_value = fmt : : format ( " bytes={}- " , * range - > begin ) ;
request . set ( " Range " , range_header_value ) ;
}
if ( ! credentials . getUsername ( ) . empty ( ) )
credentials . authenticate ( request ) ;
2023-05-05 03:11:51 +00:00
}
2024-03-03 13:22:40 +00:00
size_t ReadWriteBufferFromHTTP : : getFileSize ( )
2023-05-05 01:52:33 +00:00
{
if ( ! file_info )
file_info = getFileInfo ( ) ;
if ( file_info - > file_size )
return * file_info - > file_size ;
2024-03-03 13:22:40 +00:00
throw Exception ( ErrorCodes : : UNKNOWN_FILE_SIZE , " Cannot find out file size for: {} " , initial_uri . toString ( ) ) ;
2023-05-05 01:52:33 +00:00
}
2024-03-03 13:22:40 +00:00
bool ReadWriteBufferFromHTTP : : supportsReadAt ( )
2023-05-05 03:11:51 +00:00
{
if ( ! file_info )
file_info = getFileInfo ( ) ;
return method = = Poco : : Net : : HTTPRequest : : HTTP_GET & & file_info - > seekable ;
}
2024-03-03 13:22:40 +00:00
bool ReadWriteBufferFromHTTP : : checkIfActuallySeekable ( )
2023-05-05 01:52:33 +00:00
{
if ( ! file_info )
file_info = getFileInfo ( ) ;
return file_info - > seekable ;
}
2024-03-03 13:22:40 +00:00
String ReadWriteBufferFromHTTP : : getFileName ( ) const
{
return initial_uri . toString ( ) ;
}
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
void ReadWriteBufferFromHTTP : : getHeadResponse ( Poco : : Net : : HTTPResponse & response )
2023-05-05 01:52:33 +00:00
{
2024-03-03 13:22:40 +00:00
doWithRetries (
[ & ] ( )
2023-05-05 01:52:33 +00:00
{
2024-03-03 13:22:40 +00:00
callWithRedirects ( response , Poco : : Net : : HTTPRequest : : HTTP_HEAD , { } ) ;
} ,
/*on_retry=*/ nullptr ,
/*mute_logging=*/ true ) ;
2023-05-05 01:52:33 +00:00
}
2024-03-03 13:22:40 +00:00
ReadWriteBufferFromHTTP : : ReadWriteBufferFromHTTP (
const HTTPConnectionGroupType & connection_group_ ,
const Poco : : URI & uri_ ,
2023-05-05 01:52:33 +00:00
const std : : string & method_ ,
2024-03-03 13:22:40 +00:00
ProxyConfiguration proxy_config_ ,
ReadSettings read_settings_ ,
ConnectionTimeouts timeouts_ ,
const Poco : : Net : : HTTPBasicCredentials & credentials_ ,
2023-05-05 01:52:33 +00:00
const RemoteHostFilter * remote_host_filter_ ,
2024-03-03 13:22:40 +00:00
size_t buffer_size_ ,
size_t max_redirects_ ,
OutStreamCallback out_stream_callback_ ,
2023-05-05 01:52:33 +00:00
bool use_external_buffer_ ,
bool http_skip_not_found_url_ ,
2024-03-03 13:22:40 +00:00
HTTPHeaderEntries http_header_entries_ ,
bool delay_initialization ,
std : : optional < HTTPFileInfo > file_info_ )
2023-05-05 01:52:33 +00:00
: SeekableReadBuffer ( nullptr , 0 )
2024-03-03 13:22:40 +00:00
, connection_group ( connection_group_ )
, initial_uri ( uri_ )
, method ( ! method_ . empty ( ) ? method_ : out_stream_callback_ ? Poco : : Net : : HTTPRequest : : HTTP_POST : Poco : : Net : : HTTPRequest : : HTTP_GET )
, proxy_config ( std : : move ( proxy_config_ ) )
, read_settings ( std : : move ( read_settings_ ) )
, timeouts ( std : : move ( timeouts_ ) )
, credentials ( credentials_ )
, remote_host_filter ( remote_host_filter_ )
, buffer_size ( buffer_size_ )
, max_redirects ( max_redirects_ )
, use_external_buffer ( use_external_buffer_ )
, http_skip_not_found_url ( http_skip_not_found_url_ )
, out_stream_callback ( std : : move ( out_stream_callback_ ) )
, redirects ( 0 )
2023-05-05 01:52:33 +00:00
, http_header_entries { std : : move ( http_header_entries_ ) }
, file_info ( file_info_ )
2024-01-23 17:04:50 +00:00
, log ( getLogger ( " ReadWriteBufferFromHTTP " ) )
2023-05-05 01:52:33 +00:00
{
2024-03-03 13:22:40 +00:00
current_uri = initial_uri ;
if ( current_uri . getPath ( ) . empty ( ) )
current_uri . setPath ( " / " ) ;
if ( read_settings . http_max_tries < = 0 | | read_settings . http_retry_initial_backoff_ms < = 0
| | read_settings . http_retry_initial_backoff_ms > = read_settings . http_retry_max_backoff_ms )
2023-05-05 01:52:33 +00:00
throw Exception (
ErrorCodes : : BAD_ARGUMENTS ,
" Invalid setting for http backoff, "
" must be http_max_tries >= 1 (current is {}) and "
" 0 < http_retry_initial_backoff_ms < settings.http_retry_max_backoff_ms (now 0 < {} < {}) " ,
2024-03-03 13:22:40 +00:00
read_settings . http_max_tries ,
read_settings . http_retry_initial_backoff_ms ,
read_settings . http_retry_max_backoff_ms ) ;
2023-05-05 01:52:33 +00:00
// Configure User-Agent if it not already set.
const std : : string user_agent = " User-Agent " ;
2024-03-03 13:22:40 +00:00
auto iter = std : : find_if ( http_header_entries . begin ( ) , http_header_entries . end ( ) ,
[ & user_agent ] ( const HTTPHeaderEntry & entry ) { return entry . name = = user_agent ; } ) ;
2023-05-05 01:52:33 +00:00
if ( iter = = http_header_entries . end ( ) )
{
2024-03-03 13:22:40 +00:00
http_header_entries . emplace_back ( user_agent , fmt : : format ( " ClickHouse/{} " , VERSION_STRING ) ) ;
2023-05-05 01:52:33 +00:00
}
2024-03-03 13:22:40 +00:00
if ( ! delay_initialization & & use_external_buffer )
throw Exception (
ErrorCodes : : BAD_ARGUMENTS ,
" Invalid setting for ReadWriteBufferFromHTTP "
" delay_initialization is false and use_external_buffer it true. " ) ;
2023-05-05 01:52:33 +00:00
if ( ! delay_initialization )
{
2024-03-03 13:22:40 +00:00
next ( ) ;
2023-05-05 01:52:33 +00:00
}
}
2024-03-03 13:22:40 +00:00
ReadWriteBufferFromHTTP : : CallResult ReadWriteBufferFromHTTP : : callImpl (
2024-03-28 18:33:23 +00:00
Poco : : Net : : HTTPResponse & response , const std : : string & method_ , const std : : optional < HTTPRange > & range , bool allow_redirects ) const
2023-05-05 01:52:33 +00:00
{
2024-03-03 13:22:40 +00:00
if ( remote_host_filter )
2024-03-28 18:33:23 +00:00
remote_host_filter - > checkURL ( current_uri ) ;
2024-03-03 13:22:40 +00:00
2024-03-28 18:33:23 +00:00
Poco : : Net : : HTTPRequest request ( method_ , current_uri . getPathAndQuery ( ) , Poco : : Net : : HTTPRequest : : HTTP_1_1 ) ;
2024-03-03 13:22:40 +00:00
prepareRequest ( request , range ) ;
2023-05-05 01:52:33 +00:00
2024-03-28 18:33:23 +00:00
auto session = makeHTTPSession ( connection_group , current_uri , timeouts , proxy_config ) ;
2023-05-05 01:52:33 +00:00
2024-03-26 09:14:20 +00:00
ProfileEvents : : increment ( ProfileEvents : : ReadWriteBufferFromHTTPRequestsSent ) ;
2024-03-03 13:22:40 +00:00
auto & stream_out = session - > sendRequest ( request ) ;
if ( out_stream_callback )
out_stream_callback ( stream_out ) ;
auto & resp_stream = session - > receiveResponse ( response ) ;
assertResponseIsOk ( current_uri . toString ( ) , response , resp_stream , allow_redirects ) ;
return ReadWriteBufferFromHTTP : : CallResult ( std : : move ( session ) , resp_stream ) ;
}
ReadWriteBufferFromHTTP : : CallResult ReadWriteBufferFromHTTP : : callWithRedirects (
Poco : : Net : : HTTPResponse & response , const String & method_ , const std : : optional < HTTPRange > & range )
{
2024-03-28 18:33:23 +00:00
auto result = callImpl ( response , method_ , range , true ) ;
2023-05-05 01:52:33 +00:00
while ( isRedirect ( response . getStatus ( ) ) )
{
2024-03-03 13:22:40 +00:00
Poco : : URI uri_redirect = getUriAfterRedirect ( current_uri , response ) ;
+ + redirects ;
if ( redirects > max_redirects )
throw Exception (
ErrorCodes : : TOO_MANY_REDIRECTS ,
" Too many redirects while trying to access {}. "
" You can {} redirects by changing the setting 'max_http_get_redirects'. "
" Example: `SET max_http_get_redirects = 10`. "
" Redirects are restricted to prevent possible attack when a malicious server redirects to an internal resource, bypassing the authentication or firewall. " ,
initial_uri . toString ( ) , max_redirects ? " increase the allowed maximum number of " : " allow " ) ;
current_uri = uri_redirect ;
2024-03-28 18:33:23 +00:00
result = callImpl ( response , method_ , range , true ) ;
2023-05-05 01:52:33 +00:00
}
2024-03-03 13:22:40 +00:00
return result ;
2023-05-05 01:52:33 +00:00
}
2024-03-03 13:22:40 +00:00
void ReadWriteBufferFromHTTP : : doWithRetries ( std : : function < void ( ) > & & callable ,
std : : function < void ( ) > on_retry ,
bool mute_logging ) const
2023-05-05 01:52:33 +00:00
{
2024-03-03 13:22:40 +00:00
[[maybe_unused]] auto milliseconds_to_wait = read_settings . http_retry_initial_backoff_ms ;
bool is_retriable = true ;
std : : exception_ptr exception = nullptr ;
for ( size_t attempt = 1 ; attempt < = read_settings . http_max_tries ; + + attempt )
2023-05-05 01:52:33 +00:00
{
2024-03-03 13:22:40 +00:00
[[maybe_unused]] bool last_attempt = attempt + 1 > read_settings . http_max_tries ;
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
String error_message ;
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
try
{
callable ( ) ;
return ;
}
catch ( Poco : : Net : : NetException & e )
{
error_message = e . displayText ( ) ;
exception = std : : current_exception ( ) ;
}
catch ( DB : : NetException & e )
{
error_message = e . displayText ( ) ;
exception = std : : current_exception ( ) ;
}
catch ( DB : : HTTPException & e )
{
if ( ! isRetriableError ( e . getHTTPStatus ( ) ) )
is_retriable = false ;
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
error_message = e . displayText ( ) ;
exception = std : : current_exception ( ) ;
}
catch ( DB : : Exception & e )
2023-05-05 01:52:33 +00:00
{
2024-03-03 13:22:40 +00:00
is_retriable = false ;
error_message = e . displayText ( ) ;
exception = std : : current_exception ( ) ;
2023-05-05 01:52:33 +00:00
}
2024-03-03 13:22:40 +00:00
catch ( Poco : : Exception & e )
2023-05-05 01:52:33 +00:00
{
2024-03-03 13:22:40 +00:00
if ( e . code ( ) = = POCO_EMFILE )
is_retriable = false ;
error_message = e . displayText ( ) ;
2023-05-05 01:52:33 +00:00
exception = std : : current_exception ( ) ;
}
2024-03-03 13:22:40 +00:00
chassert ( exception ) ;
if ( last_attempt | | ! is_retriable )
{
if ( ! mute_logging )
LOG_ERROR ( log ,
2024-03-28 18:33:23 +00:00
" Failed to make request to `{}`{}. "
" Error: '{}'. "
2024-03-03 13:22:40 +00:00
" Failed at try {}/{}. " ,
2024-03-28 18:33:23 +00:00
initial_uri . toString ( ) , current_uri = = initial_uri ? String ( ) : fmt : : format ( " redirect to `{}` " , current_uri . toString ( ) ) ,
error_message ,
2024-03-03 13:22:40 +00:00
attempt , read_settings . http_max_tries ) ;
std : : rethrow_exception ( exception ) ;
}
2023-05-05 01:52:33 +00:00
else
{
2024-03-03 13:22:40 +00:00
if ( on_retry )
on_retry ( ) ;
if ( ! mute_logging )
LOG_INFO ( log ,
2024-03-28 18:33:23 +00:00
" Failed to make request to `{}`{}. "
" Error: {}. "
2024-03-03 13:22:40 +00:00
" Failed at try {}/{}. "
" Will retry with current backoff wait is {}/{} ms. " ,
2024-03-28 18:33:23 +00:00
initial_uri . toString ( ) , current_uri = = initial_uri ? String ( ) : fmt : : format ( " redirect to `{}` " , current_uri . toString ( ) ) ,
error_message ,
2024-03-03 13:22:40 +00:00
attempt + 1 , read_settings . http_max_tries ,
milliseconds_to_wait , read_settings . http_retry_max_backoff_ms ) ;
sleepForMilliseconds ( milliseconds_to_wait ) ;
milliseconds_to_wait = std : : min ( milliseconds_to_wait * 2 , read_settings . http_retry_max_backoff_ms ) ;
2023-05-05 01:52:33 +00:00
}
}
}
2024-03-03 13:22:40 +00:00
std : : unique_ptr < ReadBuffer > ReadWriteBufferFromHTTP : : initialize ( )
2023-05-05 01:52:33 +00:00
{
Poco : : Net : : HTTPResponse response ;
2024-03-03 13:22:40 +00:00
std : : optional < HTTPRange > range ;
if ( withPartialContent ( ) )
range = HTTPRange { getOffset ( ) , read_range . end } ;
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
auto result = callWithRedirects ( response , method , range ) ;
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
if ( range . has_value ( ) & & response . getStatus ( ) ! = Poco : : Net : : HTTPResponse : : HTTPStatus : : HTTP_PARTIAL_CONTENT )
2023-05-05 01:52:33 +00:00
{
/// Having `200 OK` instead of `206 Partial Content` is acceptable in case we retried with range.begin == 0.
if ( getOffset ( ) ! = 0 )
{
2024-03-03 13:22:40 +00:00
/// Retry 200OK
if ( response . getStatus ( ) = = Poco : : Net : : HTTPResponse : : HTTPStatus : : HTTP_OK )
2023-05-05 01:52:33 +00:00
{
2024-03-03 13:22:40 +00:00
String reason = fmt : : format (
" Cannot read with range: [{}, {}] (response status: {}, reason: {}), will retry " ,
* read_range . begin , read_range . end ? toString ( * read_range . end ) : " - " ,
toString ( response . getStatus ( ) ) , response . getReason ( ) ) ;
/// it is retriable error
throw HTTPException (
ErrorCodes : : HTTP_RANGE_NOT_SATISFIABLE ,
current_uri . toString ( ) ,
Poco : : Net : : HTTPResponse : : HTTP_REQUESTED_RANGE_NOT_SATISFIABLE ,
reason ,
" " ) ;
}
else
throw Exception (
2023-05-05 01:52:33 +00:00
ErrorCodes : : HTTP_RANGE_NOT_SATISFIABLE ,
" Cannot read with range: [{}, {}] (response status: {}, reason: {}) " ,
* read_range . begin ,
read_range . end ? toString ( * read_range . end ) : " - " ,
2024-03-03 13:22:40 +00:00
toString ( response . getStatus ( ) ) , response . getReason ( ) ) ;
2023-05-05 01:52:33 +00:00
}
else if ( read_range . end )
{
/// We could have range.begin == 0 and range.end != 0 in case of DiskWeb and failing to read with partial content
/// will affect only performance, so a warning is enough.
2023-05-05 03:11:51 +00:00
LOG_WARNING ( log , " Unable to read with range header: [{}, {}] " , read_range . begin . value_or ( 0 ) , * read_range . end ) ;
2023-05-05 01:52:33 +00:00
}
}
2024-03-03 13:22:40 +00:00
response . getCookies ( cookies ) ;
content_encoding = response . get ( " Content-Encoding " , " " ) ;
2023-05-05 01:52:33 +00:00
// Remember file size. It'll be used to report eof in next nextImpl() call.
if ( ! read_range . end & & response . hasContentLength ( ) )
2024-03-03 13:22:40 +00:00
file_info = parseFileInfo ( response , range . has_value ( ) ? getOffset ( ) : 0 ) ;
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
return std : : move ( result ) . transformToReadBuffer ( use_external_buffer ? 0 : buffer_size ) ;
2023-05-05 01:52:33 +00:00
}
2024-03-03 13:22:40 +00:00
bool ReadWriteBufferFromHTTP : : nextImpl ( )
2023-05-05 01:52:33 +00:00
{
if ( next_callback )
next_callback ( count ( ) ) ;
2024-03-03 13:22:40 +00:00
bool next_result = false ;
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
doWithRetries (
/*callable=*/ [ & ] ( )
2023-05-05 01:52:33 +00:00
{
if ( ! impl )
{
2024-03-03 13:22:40 +00:00
try
2023-05-05 01:52:33 +00:00
{
2024-03-03 13:22:40 +00:00
impl = initialize ( ) ;
2023-05-05 01:52:33 +00:00
}
2024-03-03 13:22:40 +00:00
catch ( HTTPException & e )
2023-05-05 01:52:33 +00:00
{
2024-03-03 13:22:40 +00:00
if ( http_skip_not_found_url & & e . getHTTPStatus ( ) = = Poco : : Net : : HTTPResponse : : HTTPStatus : : HTTP_NOT_FOUND )
{
next_result = false ;
2024-03-12 18:02:58 +00:00
has_not_found_url = true ;
2024-03-03 13:22:40 +00:00
return ;
}
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
throw ;
}
2023-05-05 01:52:33 +00:00
if ( use_external_buffer )
{
2024-03-03 13:22:40 +00:00
impl - > set ( internal_buffer . begin ( ) , internal_buffer . size ( ) ) ;
}
else
{
BufferBase : : set ( impl - > buffer ( ) . begin ( ) , impl - > buffer ( ) . size ( ) , impl - > offset ( ) ) ;
2023-05-05 01:52:33 +00:00
}
}
2024-03-03 13:22:40 +00:00
if ( use_external_buffer )
{
impl - > set ( internal_buffer . begin ( ) , internal_buffer . size ( ) ) ;
}
else
{
impl - > position ( ) = position ( ) ;
}
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
next_result = impl - > next ( ) ;
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
BufferBase : : set ( impl - > buffer ( ) . begin ( ) , impl - > buffer ( ) . size ( ) , impl - > offset ( ) ) ;
offset_from_begin_pos + = working_buffer . size ( ) ;
2024-03-26 09:14:20 +00:00
ProfileEvents : : increment ( ProfileEvents : : ReadWriteBufferFromHTTPBytes , working_buffer . size ( ) ) ;
2024-03-03 13:22:40 +00:00
} ,
/*on_retry=*/ [ & ] ( )
{
impl . reset ( ) ;
} ) ;
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
return next_result ;
2023-05-05 01:52:33 +00:00
}
2024-03-03 13:22:40 +00:00
size_t ReadWriteBufferFromHTTP : : readBigAt ( char * to , size_t n , size_t offset , const std : : function < bool ( size_t ) > & progress_callback ) const
2023-05-05 03:11:51 +00:00
{
/// Caller must have checked supportsReadAt().
2024-03-03 13:22:40 +00:00
/// This ensures we've sent at least one HTTP request and populated current_uri.
2023-05-05 03:11:51 +00:00
chassert ( file_info & & file_info - > seekable ) ;
2023-06-26 21:49:44 +00:00
size_t initial_n = n ;
2024-03-03 13:22:40 +00:00
size_t total_bytes_copied = 0 ;
size_t bytes_copied = 0 ;
bool is_canceled = false ;
2023-05-05 03:11:51 +00:00
2024-03-03 13:22:40 +00:00
doWithRetries (
/*callable=*/ [ & ] ( )
2023-05-05 03:11:51 +00:00
{
2024-03-03 13:22:40 +00:00
auto range = HTTPRange { offset , offset + n - 1 } ;
Poco : : Net : : HTTPResponse response ;
2024-03-28 18:33:23 +00:00
auto result = callImpl ( response , method , range , false ) ;
2023-05-05 03:11:51 +00:00
if ( response . getStatus ( ) ! = Poco : : Net : : HTTPResponse : : HTTPStatus : : HTTP_PARTIAL_CONTENT & &
( offset ! = 0 | | offset + n < * file_info - > file_size ) )
2023-07-06 16:58:13 +00:00
{
2024-03-03 13:22:40 +00:00
String reason = fmt : : format (
" When reading with readBigAt {}. "
" Cannot read with range: [{}, {}] (response status: {}, reason: {}), will retry " ,
initial_uri . toString ( ) ,
* range . begin , * range . end ,
toString ( response . getStatus ( ) ) , response . getReason ( ) ) ;
throw HTTPException (
ErrorCodes : : HTTP_RANGE_NOT_SATISFIABLE ,
current_uri . toString ( ) ,
Poco : : Net : : HTTPResponse : : HTTP_REQUESTED_RANGE_NOT_SATISFIABLE ,
reason ,
" " ) ;
2023-07-06 16:58:13 +00:00
}
2023-05-05 03:11:51 +00:00
2024-03-03 13:22:40 +00:00
copyFromIStreamWithProgressCallback ( * result . response_stream , to , n , progress_callback , & bytes_copied , & is_canceled ) ;
2023-06-26 21:49:44 +00:00
2024-03-26 09:14:20 +00:00
ProfileEvents : : increment ( ProfileEvents : : ReadWriteBufferFromHTTPBytes , bytes_copied ) ;
2024-03-03 13:22:40 +00:00
offset + = bytes_copied ;
total_bytes_copied + = bytes_copied ;
to + = bytes_copied ;
n - = bytes_copied ;
bytes_copied = 0 ;
} ,
/*on_retry=*/ [ & ] ( )
{
2024-03-26 09:14:20 +00:00
ProfileEvents : : increment ( ProfileEvents : : ReadWriteBufferFromHTTPBytes , bytes_copied ) ;
2024-03-03 13:22:40 +00:00
offset + = bytes_copied ;
total_bytes_copied + = bytes_copied ;
to + = bytes_copied ;
n - = bytes_copied ;
bytes_copied = 0 ;
} ) ;
2023-06-26 21:49:44 +00:00
2024-03-03 13:22:40 +00:00
chassert ( total_bytes_copied = = initial_n | | is_canceled ) ;
return total_bytes_copied ;
2023-05-05 03:11:51 +00:00
}
2024-03-03 13:22:40 +00:00
off_t ReadWriteBufferFromHTTP : : getPosition ( )
{
return getOffset ( ) - available ( ) ;
}
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
off_t ReadWriteBufferFromHTTP : : seek ( off_t offset_ , int whence )
2023-05-05 01:52:33 +00:00
{
if ( whence ! = SEEK_SET )
throw Exception ( ErrorCodes : : CANNOT_SEEK_THROUGH_FILE , " Only SEEK_SET mode is allowed. " ) ;
if ( offset_ < 0 )
throw Exception ( ErrorCodes : : SEEK_POSITION_OUT_OF_BOUND , " Seek position is out of bounds. Offset: {} " ,
offset_ ) ;
off_t current_offset = getOffset ( ) ;
if ( ! working_buffer . empty ( ) & & size_t ( offset_ ) > = current_offset - working_buffer . size ( ) & & offset_ < current_offset )
{
pos = working_buffer . end ( ) - ( current_offset - offset_ ) ;
2024-03-03 13:22:40 +00:00
chassert ( pos > = working_buffer . begin ( ) ) ;
chassert ( pos < working_buffer . end ( ) ) ;
2023-05-05 01:52:33 +00:00
return getPosition ( ) ;
}
if ( impl )
{
auto position = getPosition ( ) ;
2024-03-26 08:53:13 +00:00
if ( offset_ > = position )
2023-05-05 01:52:33 +00:00
{
size_t diff = offset_ - position ;
2024-03-03 13:22:40 +00:00
if ( diff < read_settings . remote_read_min_bytes_for_seek )
2023-05-05 01:52:33 +00:00
{
ignore ( diff ) ;
return offset_ ;
}
}
if ( ! atEndOfRequestedRangeGuess ( ) )
ProfileEvents : : increment ( ProfileEvents : : ReadBufferSeekCancelConnection ) ;
2024-03-03 13:22:40 +00:00
2023-05-05 01:52:33 +00:00
impl . reset ( ) ;
}
resetWorkingBuffer ( ) ;
read_range . begin = offset_ ;
offset_from_begin_pos = 0 ;
return offset_ ;
}
2024-03-03 13:22:40 +00:00
void ReadWriteBufferFromHTTP : : setReadUntilPosition ( size_t until )
2023-05-05 01:52:33 +00:00
{
until = std : : max ( until , 1ul ) ;
if ( read_range . end & & * read_range . end + 1 = = until )
return ;
read_range . end = until - 1 ;
read_range . begin = getPosition ( ) ;
resetWorkingBuffer ( ) ;
if ( impl )
{
if ( ! atEndOfRequestedRangeGuess ( ) )
ProfileEvents : : increment ( ProfileEvents : : ReadBufferSeekCancelConnection ) ;
impl . reset ( ) ;
}
}
2024-03-03 13:22:40 +00:00
void ReadWriteBufferFromHTTP : : setReadUntilEnd ( )
2023-05-05 01:52:33 +00:00
{
if ( ! read_range . end )
return ;
read_range . end . reset ( ) ;
read_range . begin = getPosition ( ) ;
resetWorkingBuffer ( ) ;
if ( impl )
{
if ( ! atEndOfRequestedRangeGuess ( ) )
ProfileEvents : : increment ( ProfileEvents : : ReadBufferSeekCancelConnection ) ;
impl . reset ( ) ;
}
}
2024-03-03 13:22:40 +00:00
bool ReadWriteBufferFromHTTP : : supportsRightBoundedReads ( ) const { return true ; }
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
bool ReadWriteBufferFromHTTP : : atEndOfRequestedRangeGuess ( )
2023-05-05 01:52:33 +00:00
{
if ( ! impl )
return true ;
if ( read_range . end )
return getPosition ( ) > static_cast < off_t > ( * read_range . end ) ;
if ( file_info & & file_info - > file_size )
return getPosition ( ) > = static_cast < off_t > ( * file_info - > file_size ) ;
return false ;
}
2024-03-03 13:22:40 +00:00
std : : string ReadWriteBufferFromHTTP : : getResponseCookie ( const std : : string & name , const std : : string & def ) const
2023-05-05 01:52:33 +00:00
{
for ( const auto & cookie : cookies )
if ( cookie . getName ( ) = = name )
return cookie . getValue ( ) ;
return def ;
}
2024-03-03 13:22:40 +00:00
void ReadWriteBufferFromHTTP : : setNextCallback ( NextCallback next_callback_ )
2023-05-05 01:52:33 +00:00
{
next_callback = next_callback_ ;
/// Some data maybe already read
next_callback ( count ( ) ) ;
}
2024-03-03 13:22:40 +00:00
const std : : string & ReadWriteBufferFromHTTP : : getCompressionMethod ( ) const
{
return content_encoding ;
}
2023-05-05 01:52:33 +00:00
2024-03-03 13:22:40 +00:00
std : : optional < time_t > ReadWriteBufferFromHTTP : : tryGetLastModificationTime ( )
2023-05-05 01:52:33 +00:00
{
2023-08-22 11:59:59 +00:00
if ( ! file_info )
2023-08-23 18:43:08 +00:00
{
try
{
file_info = getFileInfo ( ) ;
}
catch ( . . . )
{
return std : : nullopt ;
}
}
2023-08-22 11:59:59 +00:00
return file_info - > last_modified ;
2023-05-05 01:52:33 +00:00
}
2024-03-03 13:22:40 +00:00
ReadWriteBufferFromHTTP : : HTTPFileInfo ReadWriteBufferFromHTTP : : getFileInfo ( )
2023-05-05 01:52:33 +00:00
{
2023-11-24 09:29:01 +00:00
/// May be disabled in case the user knows in advance that the server doesn't support HEAD requests.
/// Allows to avoid making unnecessary requests in such cases.
2024-03-03 13:22:40 +00:00
if ( ! read_settings . http_make_head_request )
2023-11-24 09:29:01 +00:00
return HTTPFileInfo { } ;
2023-09-13 16:47:33 +00:00
2023-05-05 01:52:33 +00:00
Poco : : Net : : HTTPResponse response ;
try
{
getHeadResponse ( response ) ;
}
catch ( HTTPException & e )
{
/// Maybe the web server doesn't support HEAD requests.
/// E.g. webhdfs reports status 400.
/// We should proceed in hopes that the actual GET request will succeed.
/// (Unless the error in transient. Don't want to nondeterministically sometimes
/// fall back to slow whole-file reads when HEAD is actually supported; that sounds
/// like a nightmare to debug.)
if ( e . getHTTPStatus ( ) > = 400 & & e . getHTTPStatus ( ) < = 499 & &
e . getHTTPStatus ( ) ! = Poco : : Net : : HTTPResponse : : HTTP_TOO_MANY_REQUESTS )
return HTTPFileInfo { } ;
throw ;
}
2024-03-03 13:22:40 +00:00
2023-05-05 01:52:33 +00:00
return parseFileInfo ( response , 0 ) ;
}
2024-03-03 13:22:40 +00:00
ReadWriteBufferFromHTTP : : HTTPFileInfo ReadWriteBufferFromHTTP : : parseFileInfo ( const Poco : : Net : : HTTPResponse & response , size_t requested_range_begin )
2023-05-05 01:52:33 +00:00
{
HTTPFileInfo res ;
if ( response . hasContentLength ( ) )
{
res . file_size = response . getContentLength ( ) ;
if ( response . getStatus ( ) = = Poco : : Net : : HTTPResponse : : HTTPStatus : : HTTP_PARTIAL_CONTENT )
{
* res . file_size + = requested_range_begin ;
res . seekable = true ;
}
else
{
res . seekable = response . has ( " Accept-Ranges " ) & & response . get ( " Accept-Ranges " ) = = " bytes " ;
}
}
if ( response . has ( " Last-Modified " ) )
{
String date_str = response . get ( " Last-Modified " ) ;
struct tm info ;
char * end = strptime ( date_str . data ( ) , " %a, %d %b %Y %H:%M:%S %Z " , & info ) ;
if ( end = = date_str . data ( ) + date_str . size ( ) )
res . last_modified = timegm ( & info ) ;
}
return res ;
}
}