2022-01-22 22:56:24 +00:00
# include "FileSegment.h"
2022-01-30 11:35:28 +00:00
# include <base/getThreadId.h>
2022-01-22 22:56:24 +00:00
# include <Common/FileCache.h>
2022-01-30 11:35:28 +00:00
# include <Common/hex.h>
2022-02-02 14:25:25 +00:00
# include <IO/WriteBufferFromString.h>
# include <IO/Operators.h>
2022-01-30 11:35:28 +00:00
# include <filesystem>
2022-01-22 22:56:24 +00:00
namespace DB
{
namespace ErrorCodes
{
2022-02-18 15:38:23 +00:00
extern const int REMOTE_FS_OBJECT_CACHE_ERROR ;
2022-01-30 11:35:28 +00:00
extern const int LOGICAL_ERROR ;
2022-01-22 22:56:24 +00:00
}
FileSegment : : FileSegment (
size_t offset_ ,
size_t size_ ,
2022-01-23 16:51:18 +00:00
const Key & key_ ,
2022-02-18 15:38:23 +00:00
IFileCache * cache_ ,
2022-01-22 22:56:24 +00:00
State download_state_ )
: segment_range ( offset_ , offset_ + size_ - 1 )
, download_state ( download_state_ )
, file_key ( key_ )
, cache ( cache_ )
2022-02-18 15:38:23 +00:00
# ifndef NDEBUG
2022-01-30 11:35:28 +00:00
, log ( & Poco : : Logger : : get ( fmt : : format ( " FileSegment({}) : {} " , getHexUIntLowercase ( key_ ) , range ( ) . toString ( ) ) ) )
2022-02-18 15:38:23 +00:00
# else
, log ( & Poco : : Logger : : get ( " FileSegment " ) )
# endif
2022-01-22 22:56:24 +00:00
{
2022-03-21 11:30:25 +00:00
/// On creation, file segment state can be EMPTY, DOWNLOADED, DOWNLOADING.
switch ( download_state )
{
/// EMPTY is used when file segment is not in cache and
/// someone will _potentially_ want to download it (after calling getOrSetDownloader()).
case ( State : : EMPTY ) :
{
break ;
}
2022-03-21 20:20:15 +00:00
/// DOWNLOADED is used either on initial cache metadata load into memory on server startup
2022-03-21 11:30:25 +00:00
/// or on reduceSizeToDownloaded() -- when file segment object is updated.
case ( State : : DOWNLOADED ) :
{
reserved_size = downloaded_size = size_ ;
break ;
}
/// DOWNLOADING is used only for write-through caching (e.g. getOrSetDownloader() is not
/// needed, downloader is set on file segment creation).
case ( State : : DOWNLOADING ) :
{
downloader_id = getCallerId ( ) ;
break ;
}
default :
{
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Can create cell with either EMPTY, DOWNLOADED, DOWNLOADING state " ) ;
}
}
2022-01-22 22:56:24 +00:00
}
2022-01-23 16:51:18 +00:00
FileSegment : : State FileSegment : : state ( ) const
2022-01-22 22:56:24 +00:00
{
std : : lock_guard segment_lock ( mutex ) ;
2022-01-23 16:51:18 +00:00
return download_state ;
2022-01-22 22:56:24 +00:00
}
2022-02-18 15:38:23 +00:00
size_t FileSegment : : getDownloadOffset ( ) const
2022-01-22 22:56:24 +00:00
{
std : : lock_guard segment_lock ( mutex ) ;
2022-03-14 16:33:29 +00:00
return range ( ) . left + getDownloadedSize ( segment_lock ) ;
}
size_t FileSegment : : getDownloadedSize ( std : : lock_guard < std : : mutex > & /* segment_lock */ ) const
{
if ( download_state = = State : : DOWNLOADED )
return downloaded_size ;
std : : lock_guard download_lock ( download_mutex ) ;
return downloaded_size ;
2022-01-22 22:56:24 +00:00
}
String FileSegment : : getCallerId ( )
{
2022-03-08 09:58:37 +00:00
return getCallerIdImpl ( false ) ;
}
String FileSegment : : getCallerIdImpl ( bool allow_non_strict_checking )
{
2022-03-29 17:49:42 +00:00
if ( IFileCache : : shouldBypassCache ( ) )
2022-03-08 09:58:37 +00:00
{
/// getCallerId() can be called from completeImpl(), which can be called from complete().
/// complete() is called from destructor of CachedReadBufferFromRemoteFS when there is no query id anymore.
2022-03-09 09:36:52 +00:00
/// Allow non strict checking in this case. This works correctly as if getCallerIdImpl() is called from destructor,
/// then we know that caller is not a downloader, because downloader is reset each nextImpl() call either
/// manually or via SCOPE_EXIT.
2022-03-08 09:58:37 +00:00
if ( allow_non_strict_checking )
return " None " ;
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Cannot use cache without query id " ) ;
2022-03-08 09:58:37 +00:00
}
2022-01-22 22:56:24 +00:00
2022-01-26 18:43:23 +00:00
return CurrentThread : : getQueryId ( ) . toString ( ) + " : " + toString ( getThreadId ( ) ) ;
2022-01-22 22:56:24 +00:00
}
String FileSegment : : getOrSetDownloader ( )
{
std : : lock_guard segment_lock ( mutex ) ;
if ( downloader_id . empty ( ) )
{
2022-03-09 17:14:28 +00:00
assert ( download_state ! = State : : DOWNLOADING ) ;
2022-02-01 19:10:56 +00:00
if ( download_state ! = State : : EMPTY
& & download_state ! = State : : PARTIALLY_DOWNLOADED )
2022-03-09 17:14:28 +00:00
return " None " ;
2022-02-01 19:10:56 +00:00
2022-01-22 22:56:24 +00:00
downloader_id = getCallerId ( ) ;
download_state = State : : DOWNLOADING ;
}
2022-01-26 18:43:23 +00:00
else if ( downloader_id = = getCallerId ( ) )
2022-03-09 09:36:52 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
" Attempt to set the same downloader for segment {} for the second time " , range ( ) . toString ( ) ) ;
2022-01-22 22:56:24 +00:00
2022-01-26 18:43:23 +00:00
return downloader_id ;
}
2022-03-06 19:33:07 +00:00
void FileSegment : : resetDownloader ( )
{
std : : lock_guard segment_lock ( mutex ) ;
if ( downloader_id . empty ( ) )
2022-03-09 09:36:52 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " There is no downloader " ) ;
2022-03-06 19:33:07 +00:00
if ( getCallerId ( ) ! = downloader_id )
2022-03-09 09:36:52 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Downloader can be reset only by downloader " ) ;
resetDownloaderImpl ( segment_lock ) ;
}
2022-03-06 19:33:07 +00:00
2022-03-09 09:36:52 +00:00
void FileSegment : : resetDownloaderImpl ( std : : lock_guard < std : : mutex > & segment_lock )
{
2022-03-07 21:03:12 +00:00
if ( downloaded_size = = range ( ) . size ( ) )
setDownloaded ( segment_lock ) ;
else
download_state = State : : PARTIALLY_DOWNLOADED ;
2022-03-06 19:33:07 +00:00
downloader_id . clear ( ) ;
}
2022-01-26 18:43:23 +00:00
String FileSegment : : getDownloader ( ) const
{
std : : lock_guard segment_lock ( mutex ) ;
2022-01-22 22:56:24 +00:00
return downloader_id ;
}
bool FileSegment : : isDownloader ( ) const
{
std : : lock_guard segment_lock ( mutex ) ;
2022-03-29 15:33:02 +00:00
return getCallerId ( ) = = downloader_id ;
2022-01-22 22:56:24 +00:00
}
2022-01-26 18:43:23 +00:00
FileSegment : : RemoteFileReaderPtr FileSegment : : getRemoteFileReader ( )
{
if ( ! isDownloader ( ) )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Only downloader can use remote filesystem file reader " ) ;
2022-01-26 18:43:23 +00:00
return remote_file_reader ;
}
void FileSegment : : setRemoteFileReader ( RemoteFileReaderPtr remote_file_reader_ )
{
if ( ! isDownloader ( ) )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Only downloader can use remote filesystem file reader " ) ;
2022-01-26 18:43:23 +00:00
if ( remote_file_reader )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Remote file reader already exists " ) ;
remote_file_reader = remote_file_reader_ ;
}
2022-03-17 19:29:07 +00:00
void FileSegment : : resetRemoteFileReader ( )
{
if ( ! isDownloader ( ) )
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Only downloader can use remote filesystem file reader " ) ;
if ( ! remote_file_reader )
throw Exception ( ErrorCodes : : LOGICAL_ERROR , " Remote file reader does not exist " ) ;
remote_file_reader . reset ( ) ;
}
2022-03-16 12:27:58 +00:00
void FileSegment : : write ( const char * from , size_t size , size_t offset_ )
2022-01-22 22:56:24 +00:00
{
2022-01-23 16:51:18 +00:00
if ( ! size )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Writing zero size is not allowed " ) ;
2022-01-23 16:51:18 +00:00
2022-01-24 22:07:02 +00:00
if ( availableSize ( ) < size )
2022-01-22 22:56:24 +00:00
throw Exception (
2022-02-18 15:38:23 +00:00
ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
2022-01-24 22:07:02 +00:00
" Not enough space is reserved. Available: {}, expected: {} " , availableSize ( ) , size ) ;
2022-01-22 22:56:24 +00:00
2022-01-23 16:51:18 +00:00
if ( ! isDownloader ( ) )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
2022-01-26 18:43:23 +00:00
" Only downloader can do the downloading. (CallerId: {}, DownloaderId: {}) " ,
getCallerId ( ) , downloader_id ) ;
2022-01-23 16:51:18 +00:00
2022-03-17 16:50:51 +00:00
if ( downloaded_size = = range ( ) . size ( ) )
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
" Attempt to write {} bytes to offset: {}, but current file segment is already fully downloaded " ,
size , offset_ ) ;
2022-03-14 16:33:29 +00:00
auto download_offset = range ( ) . left + downloaded_size ;
if ( offset_ ! = download_offset )
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
2022-03-17 16:50:51 +00:00
" Attempt to write {} bytes to offset: {}, but current download offset is {} " ,
2022-03-14 16:33:29 +00:00
size , offset_ , download_offset ) ;
2022-03-16 12:27:58 +00:00
2022-01-26 18:43:23 +00:00
if ( ! cache_writer )
2022-01-22 22:56:24 +00:00
{
2022-03-14 16:33:29 +00:00
if ( downloaded_size > 0 )
2022-03-17 16:50:51 +00:00
throw Exception ( ErrorCodes : : LOGICAL_ERROR ,
2022-03-18 09:16:06 +00:00
" Cache writer was finalized (downloaded size: {}, state: {}) " ,
2022-03-17 16:50:51 +00:00
downloaded_size , stateToString ( download_state ) ) ;
2022-03-14 16:33:29 +00:00
2022-02-18 15:38:23 +00:00
auto download_path = cache - > getPathInLocalCache ( key ( ) , offset ( ) ) ;
2022-01-26 18:43:23 +00:00
cache_writer = std : : make_unique < WriteBufferFromFile > ( download_path ) ;
2022-01-22 22:56:24 +00:00
}
2022-02-02 14:25:25 +00:00
try
{
cache_writer - > write ( from , size ) ;
2022-03-14 16:33:29 +00:00
std : : lock_guard download_lock ( download_mutex ) ;
2022-02-02 14:25:25 +00:00
cache_writer - > next ( ) ;
2022-03-14 16:33:29 +00:00
downloaded_size + = size ;
2022-02-02 14:25:25 +00:00
}
2022-03-17 19:29:07 +00:00
catch ( Exception & e )
2022-02-02 14:25:25 +00:00
{
2022-02-18 15:38:23 +00:00
std : : lock_guard segment_lock ( mutex ) ;
2022-03-17 19:29:07 +00:00
auto info = getInfoForLogImpl ( segment_lock ) ;
e . addMessage ( " while writing into cache, info: " + info ) ;
LOG_ERROR ( log , " Failed to write to cache. File segment info: {} " , info ) ;
2022-03-10 09:56:48 +00:00
2022-02-15 09:11:33 +00:00
download_state = State : : PARTIALLY_DOWNLOADED_NO_CONTINUATION ;
2022-02-18 15:38:23 +00:00
cache_writer - > finalize ( ) ;
cache_writer . reset ( ) ;
2022-03-16 12:27:58 +00:00
cv . notify_all ( ) ;
2022-02-02 14:25:25 +00:00
throw ;
}
2022-03-16 13:29:21 +00:00
2022-03-14 16:33:29 +00:00
assert ( getDownloadOffset ( ) = = offset_ + size ) ;
2022-01-22 22:56:24 +00:00
}
FileSegment : : State FileSegment : : wait ( )
{
std : : unique_lock segment_lock ( mutex ) ;
2022-03-06 19:33:07 +00:00
if ( downloader_id . empty ( ) )
return download_state ;
2022-01-22 22:56:24 +00:00
if ( download_state = = State : : EMPTY )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Cannot wait on a file segment with empty state " ) ;
2022-01-22 22:56:24 +00:00
if ( download_state = = State : : DOWNLOADING )
{
2022-01-30 11:35:28 +00:00
LOG_TEST ( log , " {} waiting on: {}, current downloader: {} " , getCallerId ( ) , range ( ) . toString ( ) , downloader_id ) ;
2022-01-26 09:35:46 +00:00
2022-02-01 19:10:56 +00:00
assert ( ! downloader_id . empty ( ) ) ;
assert ( downloader_id ! = getCallerId ( ) ) ;
2022-01-26 09:35:46 +00:00
2022-02-21 12:54:03 +00:00
cv . wait_for ( segment_lock , std : : chrono : : seconds ( 60 ) ) ;
2022-01-22 22:56:24 +00:00
}
return download_state ;
}
bool FileSegment : : reserve ( size_t size )
{
if ( ! size )
2022-02-18 15:38:23 +00:00
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Zero space reservation is not allowed " ) ;
2022-01-22 22:56:24 +00:00
2022-02-24 14:20:51 +00:00
{
std : : lock_guard segment_lock ( mutex ) ;
2022-01-22 22:56:24 +00:00
2022-03-14 16:33:29 +00:00
auto caller_id = getCallerId ( ) ;
if ( downloader_id ! = caller_id )
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " Space can be reserved only by downloader (current: {}, expected: {}) " , caller_id, downloader_id) ;
2022-02-24 14:20:51 +00:00
if ( downloaded_size + size > range ( ) . size ( ) )
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
" Attempt to reserve space too much space ({}) for file segment with range: {} (downloaded size: {}) " ,
size , range ( ) . toString ( ) , downloaded_size ) ;
2022-01-22 22:56:24 +00:00
2022-02-24 14:20:51 +00:00
assert ( reserved_size > = downloaded_size ) ;
}
2022-01-22 22:56:24 +00:00
/**
* It is possible to have downloaded_size < reserved_size when reserve is called
* in case previous downloader did not fully download current file_segment
* and the caller is going to continue ;
*/
size_t free_space = reserved_size - downloaded_size ;
size_t size_to_reserve = size - free_space ;
2022-02-18 15:38:23 +00:00
std : : lock_guard cache_lock ( cache - > mutex ) ;
2022-01-23 16:51:18 +00:00
bool reserved = cache - > tryReserve ( key ( ) , offset ( ) , size_to_reserve , cache_lock ) ;
2022-01-22 22:56:24 +00:00
if ( reserved )
reserved_size + = size ;
return reserved ;
}
2022-02-23 10:12:14 +00:00
void FileSegment : : setDownloaded ( std : : lock_guard < std : : mutex > & /* segment_lock */ )
{
download_state = State : : DOWNLOADED ;
is_downloaded = true ;
if ( cache_writer )
{
cache_writer - > finalize ( ) ;
cache_writer . reset ( ) ;
2022-03-11 11:57:57 +00:00
remote_file_reader . reset ( ) ;
2022-02-23 10:12:14 +00:00
}
}
2022-01-26 18:43:23 +00:00
void FileSegment : : completeBatchAndResetDownloader ( )
2022-01-22 22:56:24 +00:00
{
2022-02-23 13:43:40 +00:00
std : : lock_guard segment_lock ( mutex ) ;
bool is_downloader = downloader_id = = getCallerId ( ) ;
if ( ! is_downloader )
2022-01-22 22:56:24 +00:00
{
2022-02-23 13:43:40 +00:00
cv . notify_all ( ) ;
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR , " File segment can be completed only by downloader " ) ;
}
2022-01-22 22:56:24 +00:00
2022-03-09 09:36:52 +00:00
resetDownloaderImpl ( segment_lock ) ;
2022-02-23 13:43:40 +00:00
LOG_TEST ( log , " Complete batch. Current downloaded size: {} " , downloaded_size ) ;
2022-01-26 09:35:46 +00:00
cv . notify_all ( ) ;
}
2022-03-11 11:17:17 +00:00
void FileSegment : : complete ( State state )
2022-01-26 09:35:46 +00:00
{
2022-03-17 19:29:07 +00:00
std : : lock_guard cache_lock ( cache - > mutex ) ;
std : : lock_guard segment_lock ( mutex ) ;
2022-03-01 16:00:54 +00:00
2022-03-17 19:29:07 +00:00
bool is_downloader = downloader_id = = getCallerId ( ) ;
if ( ! is_downloader )
2022-02-23 13:43:40 +00:00
{
2022-03-17 19:29:07 +00:00
cv . notify_all ( ) ;
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
" File segment can be completed only by downloader or downloader's FileSegmentsHodler " ) ;
}
2022-02-15 09:11:33 +00:00
2022-03-17 19:29:07 +00:00
if ( state ! = State : : DOWNLOADED
& & state ! = State : : PARTIALLY_DOWNLOADED
& & state ! = State : : PARTIALLY_DOWNLOADED_NO_CONTINUATION )
{
cv . notify_all ( ) ;
throw Exception ( ErrorCodes : : REMOTE_FS_OBJECT_CACHE_ERROR ,
" Cannot complete file segment with state: {} " , stateToString ( state ) ) ;
2022-03-01 16:00:54 +00:00
}
2022-03-17 19:29:07 +00:00
download_state = state ;
2022-01-23 17:33:22 +00:00
2022-03-16 12:27:58 +00:00
try
{
2022-03-17 19:29:07 +00:00
completeImpl ( cache_lock , segment_lock ) ;
2022-03-16 12:27:58 +00:00
}
catch ( . . . )
{
if ( ! downloader_id . empty ( ) & & downloader_id = = getCallerIdImpl ( true ) )
downloader_id . clear ( ) ;
cv . notify_all ( ) ;
throw ;
2022-03-01 16:00:54 +00:00
}
2022-02-23 13:43:40 +00:00
2022-01-24 22:07:02 +00:00
cv . notify_all ( ) ;
}
2022-01-22 22:56:24 +00:00
2022-03-17 19:29:07 +00:00
void FileSegment : : complete ( std : : lock_guard < std : : mutex > & cache_lock )
2022-01-24 22:07:02 +00:00
{
2022-03-17 19:29:07 +00:00
std : : lock_guard segment_lock ( mutex ) ;
2022-01-23 16:51:18 +00:00
2022-03-17 19:29:07 +00:00
if ( download_state = = State : : SKIP_CACHE | | detached )
return ;
2022-01-22 22:56:24 +00:00
2022-03-17 19:29:07 +00:00
if ( download_state ! = State : : DOWNLOADED & & getDownloadedSize ( segment_lock ) = = range ( ) . size ( ) )
setDownloaded ( segment_lock ) ;
2022-01-23 16:51:18 +00:00
2022-03-21 18:48:13 +00:00
if ( download_state = = State : : DOWNLOADING | | download_state = = State : : EMPTY )
{
/// Segment state can be changed from DOWNLOADING or EMPTY only if the caller is the
/// downloader or the only owner of the segment.
2022-01-22 22:56:24 +00:00
2022-03-21 18:48:13 +00:00
bool can_update_segment_state = downloader_id = = getCallerIdImpl ( true )
| | cache - > isLastFileSegmentHolder ( key ( ) , offset ( ) , cache_lock , segment_lock ) ;
2022-01-23 17:33:22 +00:00
2022-03-21 18:48:13 +00:00
if ( can_update_segment_state )
2022-03-01 16:00:54 +00:00
download_state = State : : PARTIALLY_DOWNLOADED ;
}
2022-03-16 12:27:58 +00:00
try
{
2022-03-17 19:29:07 +00:00
completeImpl ( cache_lock , segment_lock , /* allow_non_strict_checking */ true ) ;
2022-03-16 12:27:58 +00:00
}
catch ( . . . )
{
if ( ! downloader_id . empty ( ) & & downloader_id = = getCallerIdImpl ( true ) )
downloader_id . clear ( ) ;
cv . notify_all ( ) ;
throw ;
2022-03-01 16:00:54 +00:00
}
2022-01-24 22:07:02 +00:00
cv . notify_all ( ) ;
}
2022-01-23 16:51:18 +00:00
2022-03-17 19:29:07 +00:00
void FileSegment : : completeImpl ( std : : lock_guard < std : : mutex > & cache_lock , std : : lock_guard < std : : mutex > & segment_lock , bool allow_non_strict_checking )
2022-01-24 22:07:02 +00:00
{
2022-03-22 09:39:58 +00:00
bool is_last_holder = cache - > isLastFileSegmentHolder ( key ( ) , offset ( ) , cache_lock , segment_lock ) ;
2022-02-15 10:27:44 +00:00
2022-03-28 23:59:53 +00:00
if ( is_last_holder
2022-03-22 09:39:58 +00:00
& & ( download_state = = State : : PARTIALLY_DOWNLOADED | | download_state = = State : : PARTIALLY_DOWNLOADED_NO_CONTINUATION ) )
2022-01-24 22:07:02 +00:00
{
2022-03-22 09:39:58 +00:00
size_t current_downloaded_size = getDownloadedSize ( segment_lock ) ;
if ( current_downloaded_size = = 0 )
{
download_state = State : : SKIP_CACHE ;
LOG_TEST ( log , " Remove cell {} (nothing downloaded) " , range ( ) . toString ( ) ) ;
cache - > remove ( key ( ) , offset ( ) , cache_lock , segment_lock ) ;
}
2022-03-29 12:08:24 +00:00
else
2022-03-22 09:39:58 +00:00
{
/**
* Only last holder of current file segment can resize the cell ,
* because there is an invariant that file segments returned to users
* in FileSegmentsHolder represent a contiguous range , so we can resize
* it only when nobody needs it .
*/
LOG_TEST ( log , " Resize cell {} to downloaded: {} " , range ( ) . toString ( ) , current_downloaded_size ) ;
cache - > reduceSizeToDownloaded ( key ( ) , offset ( ) , cache_lock , segment_lock ) ;
}
2022-01-23 16:51:18 +00:00
2022-03-29 12:08:24 +00:00
detached = true ;
2022-01-23 16:51:18 +00:00
2022-03-22 09:39:58 +00:00
if ( cache_writer )
2022-01-24 22:07:02 +00:00
{
2022-03-22 09:39:58 +00:00
cache_writer - > finalize ( ) ;
cache_writer . reset ( ) ;
remote_file_reader . reset ( ) ;
2022-01-24 22:07:02 +00:00
}
2022-01-22 22:56:24 +00:00
}
2022-03-22 09:39:58 +00:00
if ( ! downloader_id . empty ( ) & & ( downloader_id = = getCallerIdImpl ( allow_non_strict_checking ) | | is_last_holder ) )
2022-01-26 09:35:46 +00:00
{
2022-01-30 11:35:28 +00:00
LOG_TEST ( log , " Clearing downloader id: {}, current state: {} " , downloader_id , stateToString ( download_state ) ) ;
2022-01-24 22:07:02 +00:00
downloader_id . clear ( ) ;
2022-01-26 09:35:46 +00:00
}
2022-01-24 22:07:02 +00:00
2022-03-21 18:48:13 +00:00
assertCorrectnessImpl ( segment_lock ) ;
2022-01-22 22:56:24 +00:00
}
2022-02-02 14:25:25 +00:00
String FileSegment : : getInfoForLog ( ) const
{
std : : lock_guard segment_lock ( mutex ) ;
2022-03-17 17:29:31 +00:00
return getInfoForLogImpl ( segment_lock ) ;
}
2022-02-02 14:25:25 +00:00
2022-03-17 17:29:31 +00:00
String FileSegment : : getInfoForLogImpl ( std : : lock_guard < std : : mutex > & segment_lock ) const
{
2022-02-02 14:25:25 +00:00
WriteBufferFromOwnString info ;
info < < " File segment: " < < range ( ) . toString ( ) < < " , " ;
info < < " state: " < < download_state < < " , " ;
2022-03-14 16:33:29 +00:00
info < < " downloaded size: " < < getDownloadedSize ( segment_lock ) < < " , " ;
2022-02-02 14:25:25 +00:00
info < < " downloader id: " < < downloader_id < < " , " ;
info < < " caller id: " < < getCallerId ( ) ;
return info . str ( ) ;
}
2022-01-26 18:43:23 +00:00
String FileSegment : : stateToString ( FileSegment : : State state )
2022-01-22 22:56:24 +00:00
{
switch ( state )
{
case FileSegment : : State : : DOWNLOADED :
return " DOWNLOADED " ;
case FileSegment : : State : : EMPTY :
return " EMPTY " ;
case FileSegment : : State : : DOWNLOADING :
return " DOWNLOADING " ;
case FileSegment : : State : : PARTIALLY_DOWNLOADED :
return " PARTIALLY DOWNLOADED " ;
case FileSegment : : State : : PARTIALLY_DOWNLOADED_NO_CONTINUATION :
return " PARTIALLY DOWNLOADED NO CONTINUATION " ;
case FileSegment : : State : : SKIP_CACHE :
return " SKIP_CACHE " ;
}
2022-03-14 12:45:09 +00:00
__builtin_unreachable ( ) ;
2022-01-22 22:56:24 +00:00
}
2022-03-21 18:48:13 +00:00
void FileSegment : : assertCorrectness ( ) const
{
std : : lock_guard segment_lock ( mutex ) ;
assertCorrectnessImpl ( segment_lock ) ;
}
void FileSegment : : assertCorrectnessImpl ( std : : lock_guard < std : : mutex > & /* segment_lock */ ) const
{
assert ( downloader_id . empty ( ) = = ( download_state ! = FileSegment : : State : : DOWNLOADING ) ) ;
assert ( ! downloader_id . empty ( ) = = ( download_state = = FileSegment : : State : : DOWNLOADING ) ) ;
assert ( download_state ! = FileSegment : : State : : DOWNLOADED | | std : : filesystem : : file_size ( cache - > getPathInLocalCache ( key ( ) , offset ( ) ) ) > 0 ) ;
}
2022-03-17 19:29:07 +00:00
FileSegmentsHolder : : ~ FileSegmentsHolder ( )
{
/// In CacheableReadBufferFromRemoteFS file segment's downloader removes file segments from
/// FileSegmentsHolder right after calling file_segment->complete(), so on destruction here
/// remain only uncompleted file segments.
IFileCache * cache = nullptr ;
for ( auto file_segment_it = file_segments . begin ( ) ; file_segment_it ! = file_segments . end ( ) ; )
{
2022-03-28 23:59:53 +00:00
auto current_file_segment_it = file_segment_it ;
2022-03-17 19:29:07 +00:00
auto & file_segment = * current_file_segment_it ;
if ( ! cache )
cache = file_segment - > cache ;
try
{
/// File segment pointer must be reset right after calling complete() and
/// under the same mutex, because complete() checks for segment pointers.
std : : lock_guard cache_lock ( cache - > mutex ) ;
file_segment - > complete ( cache_lock ) ;
2022-03-28 23:59:53 +00:00
file_segment_it = file_segments . erase ( current_file_segment_it ) ;
2022-03-17 19:29:07 +00:00
}
catch ( . . . )
{
2022-03-28 23:59:53 +00:00
# ifdef NDEBUG
2022-03-17 19:29:07 +00:00
tryLogCurrentException ( __PRETTY_FUNCTION__ ) ;
2022-03-28 23:59:53 +00:00
# else
throw ;
2022-03-17 19:29:07 +00:00
# endif
}
}
2022-01-22 22:56:24 +00:00
}
2022-02-12 22:20:05 +00:00
String FileSegmentsHolder : : toString ( )
{
String ranges ;
for ( const auto & file_segment : file_segments )
{
if ( ! ranges . empty ( ) )
ranges + = " , " ;
ranges + = file_segment - > range ( ) . toString ( ) ;
}
return ranges ;
}
2022-01-22 22:56:24 +00:00
}